工作池
如线程池,不断的往池子里丢任务,池子开启多个线程不断的处理任务。go这里其实只是对go chan defer的简单封装。
work/main.go
package work
import "sync"
type Pool struct {
wg sync.WaitGroup
task chan func()
}
func New(size int) *Pool {
p := &Pool{
task: make(chan func()),
}
p.wg.Add(size)
for i := 0; i < size; i++ {
go func() {
defer p.wg.Done()
for task := range p.task {
task()
}
}()
}
return p
}
func (p *Pool) Run(f func()) {
p.task <- f
}
func (p *Pool) Shutdown() {
close(p.task)
p.wg.Wait()
}
使用:
package main
import (
"fmt"
"math/rand"
"time"
"./work"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
start := time.Now().UnixNano()
pool := work.New(20)
for i := 0; i < 100; i++ {
pool.Run(func() {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
})
}
pool.Shutdown()
end := time.Now().UnixNano()
fmt.Println("spent", (end-start)/1e6)
}
例子2:
package main
import (
"fmt"
"sync"
"time"
)
type WorkPool struct {
jobs chan int
results chan int
workSize int
wg sync.WaitGroup
}
func NewWorkPool(poolSize int, workSize int) *WorkPool {
return &WorkPool{
jobs: make(chan int, poolSize),
results: make(chan int, poolSize),
workSize: workSize,
}
}
func (p *WorkPool) Start() {
go func() {
p.wg.Wait()
close(p.results)
}()
p.wg.Add(p.workSize)
for i := 0; i < p.workSize; i++ {
go func(workId int) {
defer p.wg.Done()
for j := range p.jobs {
fmt.Println("worker", workId, "start", j)
time.Sleep(time.Second)
fmt.Println("worker", workId, "end", j)
p.results <- j * 2
}
}(i)
}
}
func (p *WorkPool) Stop() {
close(p.jobs)
}
func (p *WorkPool) Add(i int) {
p.jobs <- i
}
func (p *WorkPool) Result() []int {
r := []int{}
for a := range p.results {
r = append(r, a)
}
return r
}
func main() {
w := NewWorkPool(100, 8)
w.Start()
for i := 0; i < 3; i++ {
w.Add(i)
}
w.Stop()
fmt.Println(w.Result())
}
连接池
如数据库连接池,因为创建销毁数据库连接的代价比较大所以使用连接池来管理连接。
package pool
import (
"errors"
"fmt"
"io"
"sync"
)
type Pool struct {
m sync.Mutex
res chan io.Closer
factory func()(io.Closer, error)
closed bool
}
func New(fn func()(io.Closer, error), size uint)(*Pool, error) {
if size <= 0 {
panic(errors.New("pool size error"))
}
return &Pool {
res: make(chan io.Closer, size),
factory: fn,
}, nil
}
func (p *Pool)Get()(io.Closer, error) {
select {
case r, ok := <- p.res: {
fmt.Println("get resource")
if !ok {
return nil, errors.New("pool closed")
}
return r, nil
}
default:
fmt.Println("create resource")
return p.factory()
}
}
func (p *Pool)Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
close(p.res)
for r := range p.res {
r.Close()
}
}
func (p *Pool)Release(r io.Closer) {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
r.Close()
return
}
select {
case p.res <- r:
fmt.Println("resouce release")
default:
fmt.Println("pool full, resource close")
r.Close()
}
}
使用:
package main
import (
"fmt"
"io"
"os"
"test/pool"
)
func main() {
p, err := pool.New(func() (io.Closer, error) {
return os.Create("ok.txt")
}, 5)
if err != nil {
panic(err)
}
r, err := p.Get()
if err != nil {
fmt.Println(err)
return
}
switch f := r.(type) {
case *os.File:
f.WriteString("hello")
default:
fmt.Println("type error")
}
p.Release(r)
p.Close()
}