Skip to content

go 工作池 连接池

Published: at 10:38 AM | 3 min read

工作池

如线程池,不断的往池子里丢任务,池子开启多个线程不断的处理任务。go这里其实只是对go chan defer的简单封装。

work/main.go

package work

import "sync"

type Pool struct {
	wg   sync.WaitGroup
	task chan func()
}

func New(size int) *Pool {
	p := &Pool{
		task: make(chan func()),
	}
	p.wg.Add(size)

	for i := 0; i < size; i++ {
		go func() {
			defer p.wg.Done()
			for task := range p.task {
				task()
			}
		}()
	}

	return p
}

func (p *Pool) Run(f func()) {
	p.task <- f
}

func (p *Pool) Shutdown() {
	close(p.task)
	p.wg.Wait()
}

使用:

package main

import (
	"fmt"
	"math/rand"
	"time"

	"./work"
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

func main() {
	start := time.Now().UnixNano()
	pool := work.New(20)

	for i := 0; i < 100; i++ {
		pool.Run(func() {
			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
		})
	}
	pool.Shutdown()

	end := time.Now().UnixNano()
	fmt.Println("spent", (end-start)/1e6)
}

例子2:

package main

import (
	"fmt"
	"sync"
	"time"
)

type WorkPool struct {
	jobs     chan int
	results  chan int
	workSize int
	wg       sync.WaitGroup
}

func NewWorkPool(poolSize int, workSize int) *WorkPool {
	return &WorkPool{
		jobs:     make(chan int, poolSize),
		results:  make(chan int, poolSize),
		workSize: workSize,
	}
}

func (p *WorkPool) Start() {
	go func() {
		p.wg.Wait()
		close(p.results)
	}()

	p.wg.Add(p.workSize)
	for i := 0; i < p.workSize; i++ {
		go func(workId int) {
			defer p.wg.Done()
			for j := range p.jobs {
				fmt.Println("worker", workId, "start", j)
				time.Sleep(time.Second)
				fmt.Println("worker", workId, "end", j)
				p.results <- j * 2
			}
		}(i)
	}
}

func (p *WorkPool) Stop() {
	close(p.jobs)
}

func (p *WorkPool) Add(i int) {
	p.jobs <- i
}

func (p *WorkPool) Result() []int {
	r := []int{}
	for a := range p.results {
		r = append(r, a)
	}
	return r
}

func main() {
	w := NewWorkPool(100, 8)
	w.Start()
	for i := 0; i < 3; i++ {
		w.Add(i)
	}
	w.Stop()
	fmt.Println(w.Result())
}

连接池

如数据库连接池,因为创建销毁数据库连接的代价比较大所以使用连接池来管理连接。

package pool

import (
	"errors"
	"fmt"
	"io"
	"sync"
)

type Pool struct {
	m sync.Mutex
	res chan io.Closer
	factory func()(io.Closer, error)
	closed bool
}

func New(fn func()(io.Closer, error), size uint)(*Pool, error) {
	if size <= 0 {
		panic(errors.New("pool size error"))
	}
	return &Pool {
		res: make(chan io.Closer, size),
		factory: fn,
	}, nil
}

func (p *Pool)Get()(io.Closer, error) {
	select {
	case r, ok := <- p.res: {
		fmt.Println("get resource")
		if !ok {
			return nil, errors.New("pool closed")
		}
		return r, nil
	}
	default:
		fmt.Println("create resource")
		return p.factory()
	}
}

func (p *Pool)Close() {
	p.m.Lock()
	defer p.m.Unlock()

	if p.closed {
		return
	}

	p.closed = true
	close(p.res)

	for r := range p.res {
		r.Close()
	}
}

func (p *Pool)Release(r io.Closer) {
	p.m.Lock()
	defer p.m.Unlock()

	if p.closed {
		r.Close()
		return
	}

	select {
	case p.res <- r:
		fmt.Println("resouce release")
	default:
		fmt.Println("pool full, resource close")
		r.Close()
	}
}

使用:

package main

import (
	"fmt"
	"io"
	"os"
	"test/pool"
)

func main() {
	p, err := pool.New(func() (io.Closer, error) {
		return os.Create("ok.txt")
	}, 5)
	if err != nil {
		panic(err)
	}

	r, err := p.Get()
	if err != nil {
		fmt.Println(err)
		return
	}

	switch f := r.(type) {
	case *os.File:
		f.WriteString("hello")
	default:
		fmt.Println("type error")
	}

	p.Release(r)
	p.Close()
}