文章标题 原创 翻译 转载 文章内容 连接池使用场景如数据库连接,通常为了平衡性能和资源会建立多个数据库连接,将他们放到一个池子中,需要的时候从池子里面取,用完了再归还给池子,如果池子里的资源不够会创建新资源,尽量要避免新建资源的速度大于池子的大小,池子的大小要在获取资源和归还资源之间做到平衡最好, 这样才能避免了频繁的建立和销毁资源,让池子里的资源能得到最大限度的利用。 pool.go ``` package pool import ( "errors" "log" "io" "sync" ) type Pool struct { mutex sync.Mutex // 互斥量锁住同步修改的值 resources chan io.Closer // 资源放在通道里 factory func() (io.Closer, error) // 创建资源(如:连接) closed bool // 池子是否关闭 } var ErrPoolClosed = errors.New("Pool has been closed.") // 创建一个池子 // 参数1:创建一个资源的函数 // 参数2:池子大小 func New(fn func()(io.Closer, error), size uint)(*Pool, error) { if size <= 0 { return nil, errors.New("size value too small.") } return &Pool{ factory: fn, resources: make(chan io.Closer, size), }, nil } // 获取资源,从通道里获取一个资源,如果没有就新建一个 func (p *Pool) Acquire() (io.Closer, error) { select { case r, ok := <-p.resources: log.Println("Acquire shared resource") if ok { return r, nil } return nil, ErrPoolClosed default: log.Println("Acquire new resource") return p.factory() } } // 归还资源 // 如果池子已关闭就关闭当前资源 // 将资源放进通道 // 如果通道已满就直接关闭资源,尽量要少发生这种情况 func (p *Pool) Release(r io.Closer) { p.mutex.Lock() defer p.mutex.Unlock() if p.closed { r.Close() return } select { case p.resources <-r: log.Println("Release: In Queue") default: log.Println("Relase: Closing") r.Close() } } // 关闭池子 func (p *Pool) Close() { p.mutex.Lock() defer p.mutex.Unlock() if p.closed { return } p.closed = true close(p.resources) for r := range p.resources { r.Close() } } ``` main.go ``` package main import ( "log" "fmt" "io" "sync" "sync/atomic" "time" "myproject/pool" "math/rand" ) const ( maxGoroutines = 25 // 任务数 poolSize = 3 // 池子大小 ) // 假设这是个数据库连接 type dbConnection struct { ID int32 } // 数据库连接要实现关闭接口 func (conn *dbConnection) Close() error { log.Println("Close: Connection id", conn.ID) return nil } // 递增新ID func getIdImpl() func() int32 { var id int32 = 0 return func() int32 { atomic.AddInt32(&id, 1) return id } } var getId = getIdImpl() // 创建连接的方法 func createConnection() (io.Closer, error) { id := getId() log.Println("Create: New Connection", id) return &dbConnection{id}, nil } // 从连接池里取出一个连接做查询操作 func performQueries(query int, p *pool.Pool) { conn, err := p.Acquire() if err != nil { log.Println(err) return } defer p.Release(conn) // 模拟查询操作,需要消耗一定的时间 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond); log.Printf("handle conn id:%d, query id:%d\n", conn.(*dbConnection).ID, query) } // 初始化一个随机数种子,让每次程序启动生成的随机数都不一样 func init() { rand.Seed(time.Now().UnixNano()) } func main() { wg := sync.WaitGroup{} wg.Add(maxGoroutines) pool, err := pool.New(createConnection, poolSize) if err != nil { log.Println(err) } for i := 0; i < maxGoroutines; i++ { // 模拟任务调用,同一时间不应该调用那么多任务,这里间隔一会 time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond); // 启动查询任务,使用闭包函数,参数要从外面传进来,这样每次传进来的值才能不一样 go func(i int) { defer wg.Done() performQueries(i, pool) }(i) } // 所有任务完成关闭池子 wg.Wait() pool.Close() fmt.Println("bye bye...") } ``` 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交