Skip to content

go 连接池

Published: at 02:19 AM | 4 min read

连接池使用场景如数据库连接,通常为了平衡性能和资源会建立多个数据库连接,将他们放到一个池子中,需要的时候从池子里面取,用完了再归还给池子,如果池子里的资源不够会创建新资源,尽量要避免新建资源的速度大于池子的大小,池子的大小要在获取资源和归还资源之间做到平衡最好, 这样才能避免了频繁的建立和销毁资源,让池子里的资源能得到最大限度的利用。

pool.go

package pool

import (
	"errors"
	"log"
	"io"
	"sync"
)

type Pool struct {
	mutex sync.Mutex 						// 互斥量锁住同步修改的值
	resources chan io.Closer				// 资源放在通道里
	factory func() (io.Closer, error)		// 创建资源(如:连接)
	closed bool								// 池子是否关闭
}

var ErrPoolClosed = errors.New("Pool has been closed.")

// 创建一个池子
// 参数1:创建一个资源的函数
// 参数2:池子大小
func New(fn func()(io.Closer, error), size uint)(*Pool, error) {
	if size <= 0 {
		return nil, errors.New("size value too small.")
	}
	return &Pool{
		factory: fn,
		resources: make(chan io.Closer, size),
	}, nil
}

// 获取资源,从通道里获取一个资源,如果没有就新建一个
func (p *Pool) Acquire() (io.Closer, error) {
	select {
	case r, ok := <-p.resources:
		log.Println("Acquire shared resource")
		if ok {
			return r, nil
		}
		return nil, ErrPoolClosed
	default:
		log.Println("Acquire new resource")
		return p.factory()
	}
}

// 归还资源
// 如果池子已关闭就关闭当前资源
// 将资源放进通道
// 如果通道已满就直接关闭资源,尽量要少发生这种情况
func (p *Pool) Release(r io.Closer) {
	p.mutex.Lock()
	defer p.mutex.Unlock()
	if p.closed {
		r.Close()
		return
	}
	select {
	case p.resources <-r:
		log.Println("Release: In Queue")
	default:
		log.Println("Relase: Closing")
		r.Close()
	}
}

// 关闭池子
func (p *Pool) Close() {
	p.mutex.Lock()
	defer p.mutex.Unlock()
	if p.closed {
		return
	}
	p.closed = true
	close(p.resources)
	for r := range p.resources {
		r.Close()
	}
}

main.go

package main

import (
	"log"
	"fmt"
	"io"
	"sync"
	"sync/atomic"
	"time"
	"myproject/pool"
	"math/rand"
)

const (
	maxGoroutines = 25 	// 任务数
	poolSize = 3		// 池子大小
)

// 假设这是个数据库连接
type dbConnection struct {
	ID int32
}

// 数据库连接要实现关闭接口
func (conn *dbConnection) Close() error {
	log.Println("Close: Connection id", conn.ID)
	return nil
}

// 递增新ID
func getIdImpl() func() int32 {
	var id int32 = 0
	return func() int32 {
		atomic.AddInt32(&id, 1)
		return id
	}
}
var getId = getIdImpl()

// 创建连接的方法
func createConnection() (io.Closer, error) {
	id := getId()
	log.Println("Create: New Connection", id)
	return &dbConnection{id}, nil
}

// 从连接池里取出一个连接做查询操作
func performQueries(query int, p *pool.Pool) {
	conn, err := p.Acquire()
	if err != nil {
		log.Println(err)
		return
	}
	defer p.Release(conn)

	// 模拟查询操作,需要消耗一定的时间
	time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond);
	log.Printf("handle conn id:%d, query id:%d\n", conn.(*dbConnection).ID, query)
}

// 初始化一个随机数种子,让每次程序启动生成的随机数都不一样
func init() {
	rand.Seed(time.Now().UnixNano())
}

func main() {
	wg := sync.WaitGroup{}
	wg.Add(maxGoroutines)

	pool, err := pool.New(createConnection, poolSize)
	if err != nil {
		log.Println(err)
	}
	for i := 0; i < maxGoroutines; i++ {
		// 模拟任务调用,同一时间不应该调用那么多任务,这里间隔一会
		time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond);

		// 启动查询任务,使用闭包函数,参数要从外面传进来,这样每次传进来的值才能不一样
		go func(i int) {
			defer wg.Done()
			performQueries(i, pool)
		}(i)
	}

	// 所有任务完成关闭池子
	wg.Wait()
	pool.Close()

	fmt.Println("bye bye...")
}