连接池使用场景如数据库连接,通常为了平衡性能和资源会建立多个数据库连接,将他们放到一个池子中,需要的时候从池子里面取,用完了再归还给池子,如果池子里的资源不够会创建新资源,尽量要避免新建资源的速度大于池子的大小,池子的大小要在获取资源和归还资源之间做到平衡最好, 这样才能避免了频繁的建立和销毁资源,让池子里的资源能得到最大限度的利用。
pool.go
package pool
import (
"errors"
"log"
"io"
"sync"
)
type Pool struct {
mutex sync.Mutex // 互斥量锁住同步修改的值
resources chan io.Closer // 资源放在通道里
factory func() (io.Closer, error) // 创建资源(如:连接)
closed bool // 池子是否关闭
}
var ErrPoolClosed = errors.New("Pool has been closed.")
// 创建一个池子
// 参数1:创建一个资源的函数
// 参数2:池子大小
func New(fn func()(io.Closer, error), size uint)(*Pool, error) {
if size <= 0 {
return nil, errors.New("size value too small.")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
// 获取资源,从通道里获取一个资源,如果没有就新建一个
func (p *Pool) Acquire() (io.Closer, error) {
select {
case r, ok := <-p.resources:
log.Println("Acquire shared resource")
if ok {
return r, nil
}
return nil, ErrPoolClosed
default:
log.Println("Acquire new resource")
return p.factory()
}
}
// 归还资源
// 如果池子已关闭就关闭当前资源
// 将资源放进通道
// 如果通道已满就直接关闭资源,尽量要少发生这种情况
func (p *Pool) Release(r io.Closer) {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.closed {
r.Close()
return
}
select {
case p.resources <-r:
log.Println("Release: In Queue")
default:
log.Println("Relase: Closing")
r.Close()
}
}
// 关闭池子
func (p *Pool) Close() {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.closed {
return
}
p.closed = true
close(p.resources)
for r := range p.resources {
r.Close()
}
}
main.go
package main
import (
"log"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"myproject/pool"
"math/rand"
)
const (
maxGoroutines = 25 // 任务数
poolSize = 3 // 池子大小
)
// 假设这是个数据库连接
type dbConnection struct {
ID int32
}
// 数据库连接要实现关闭接口
func (conn *dbConnection) Close() error {
log.Println("Close: Connection id", conn.ID)
return nil
}
// 递增新ID
func getIdImpl() func() int32 {
var id int32 = 0
return func() int32 {
atomic.AddInt32(&id, 1)
return id
}
}
var getId = getIdImpl()
// 创建连接的方法
func createConnection() (io.Closer, error) {
id := getId()
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
// 从连接池里取出一个连接做查询操作
func performQueries(query int, p *pool.Pool) {
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
defer p.Release(conn)
// 模拟查询操作,需要消耗一定的时间
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond);
log.Printf("handle conn id:%d, query id:%d\n", conn.(*dbConnection).ID, query)
}
// 初始化一个随机数种子,让每次程序启动生成的随机数都不一样
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
wg := sync.WaitGroup{}
wg.Add(maxGoroutines)
pool, err := pool.New(createConnection, poolSize)
if err != nil {
log.Println(err)
}
for i := 0; i < maxGoroutines; i++ {
// 模拟任务调用,同一时间不应该调用那么多任务,这里间隔一会
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond);
// 启动查询任务,使用闭包函数,参数要从外面传进来,这样每次传进来的值才能不一样
go func(i int) {
defer wg.Done()
performQueries(i, pool)
}(i)
}
// 所有任务完成关闭池子
wg.Wait()
pool.Close()
fmt.Println("bye bye...")
}