go 连接池

Table of Contents

    连接池使用场景如数据库连接,通常为了平衡性能和资源会建立多个数据库连接,将他们放到一个池子中,需要的时候从池子里面取,用完了再归还给池子,如果池子里的资源不够会创建新资源,尽量要避免新建资源的速度大于池子的大小,池子的大小要在获取资源和归还资源之间做到平衡最好, 这样才能避免了频繁的建立和销毁资源,让池子里的资源能得到最大限度的利用。

    pool.go

    package pool
    
    import (
    	"errors"
    	"log"
    	"io"
    	"sync"
    )
    
    type Pool struct {
    	mutex sync.Mutex 						// 互斥量锁住同步修改的值
    	resources chan io.Closer				// 资源放在通道里
    	factory func() (io.Closer, error)		// 创建资源(如:连接)
    	closed bool								// 池子是否关闭
    }
    
    var ErrPoolClosed = errors.New("Pool has been closed.")
    
    // 创建一个池子
    // 参数1:创建一个资源的函数
    // 参数2:池子大小
    func New(fn func()(io.Closer, error), size uint)(*Pool, error) {
    	if size <= 0 {
    		return nil, errors.New("size value too small.")
    	}
    	return &Pool{
    		factory: fn,
    		resources: make(chan io.Closer, size),
    	}, nil
    }
    
    // 获取资源,从通道里获取一个资源,如果没有就新建一个
    func (p *Pool) Acquire() (io.Closer, error) {
    	select {
    	case r, ok := <-p.resources:
    		log.Println("Acquire shared resource")
    		if ok {
    			return r, nil
    		}
    		return nil, ErrPoolClosed
    	default:
    		log.Println("Acquire new resource")
    		return p.factory()
    	}
    }
    
    // 归还资源
    // 如果池子已关闭就关闭当前资源
    // 将资源放进通道
    // 如果通道已满就直接关闭资源,尽量要少发生这种情况
    func (p *Pool) Release(r io.Closer) {
    	p.mutex.Lock()
    	defer p.mutex.Unlock()
    	if p.closed {
    		r.Close()
    		return
    	}
    	select {
    	case p.resources <-r:
    		log.Println("Release: In Queue")
    	default:
    		log.Println("Relase: Closing")
    		r.Close()
    	}
    }
    
    // 关闭池子
    func (p *Pool) Close() {
    	p.mutex.Lock()
    	defer p.mutex.Unlock()
    	if p.closed {
    		return
    	}
    	p.closed = true
    	close(p.resources)
    	for r := range p.resources {
    		r.Close()
    	}
    }
    
    

    main.go

    package main
    
    import (
    	"log"
    	"fmt"
    	"io"
    	"sync"
    	"sync/atomic"
    	"time"
    	"myproject/pool"
    	"math/rand"
    )
    
    const (
    	maxGoroutines = 25 	// 任务数
    	poolSize = 3		// 池子大小
    )
    
    // 假设这是个数据库连接
    type dbConnection struct {
    	ID int32
    }
    
    // 数据库连接要实现关闭接口
    func (conn *dbConnection) Close() error {
    	log.Println("Close: Connection id", conn.ID)
    	return nil
    }
    
    // 递增新ID
    func getIdImpl() func() int32 {
    	var id int32 = 0
    	return func() int32 {
    		atomic.AddInt32(&id, 1)
    		return id
    	}
    }
    var getId = getIdImpl()
    
    // 创建连接的方法
    func createConnection() (io.Closer, error) {
    	id := getId()
    	log.Println("Create: New Connection", id)
    	return &dbConnection{id}, nil
    }
    
    // 从连接池里取出一个连接做查询操作
    func performQueries(query int, p *pool.Pool) {
    	conn, err := p.Acquire()
    	if err != nil {
    		log.Println(err)
    		return
    	}
    	defer p.Release(conn)
    
    	// 模拟查询操作,需要消耗一定的时间
    	time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond);
    	log.Printf("handle conn id:%d, query id:%d\n", conn.(*dbConnection).ID, query)
    }
    
    // 初始化一个随机数种子,让每次程序启动生成的随机数都不一样
    func init() {
    	rand.Seed(time.Now().UnixNano())
    }
    
    func main() {
    	wg := sync.WaitGroup{}
    	wg.Add(maxGoroutines)
    
    	pool, err := pool.New(createConnection, poolSize)
    	if err != nil {
    		log.Println(err)
    	}
    	for i := 0; i < maxGoroutines; i++ {
    		// 模拟任务调用,同一时间不应该调用那么多任务,这里间隔一会
    		time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond);
    
    		// 启动查询任务,使用闭包函数,参数要从外面传进来,这样每次传进来的值才能不一样
    		go func(i int) {
    			defer wg.Done()
    			performQueries(i, pool)
    		}(i)
    	}
    
    	// 所有任务完成关闭池子
    	wg.Wait()
    	pool.Close()
    
    	fmt.Println("bye bye...")
    }
    

    上一篇文章

    下一篇文章