grpool
Projects · golang/go Wiki 에 소개된 간단한 goroutine pool 을 한 번 봐보자.
Installation
c:\> go get github.com/ivpusic/grpoolSource flow
/** */ // source from : https://github.com/ivpusic/grpool func main() { // number of workers, and size of job queue pool := grpool.NewPool(100, 50) defer pool.Release() for i := 0; i < 10; i++ { count := i // enqueue pool.JobQueue <- func() { fmt.Printf("I am worker! Number %d\n", count) } } } // Will release resources used by pool func (p *Pool) Release() { p.dispatcher.stop <- true <-p.dispatcher.stop } func NewPool(numWorkers int, jobQueueLen int) *Pool { // create 'queue' and 'worker pool' jobQueue := make(chan Job, jobQueueLen) // channel with buffer workerPool := make(chan *worker, numWorkers) // create a 'Pool' with new dispatcher pool := &Pool{ JobQueue: jobQueue, dispatcher: newDispatcher(workerPool, jobQueue), } return pool } // Accepts jobs from clients, and waits for first free worker to deliver job type dispatcher struct { workerPool chan *worker jobQueue chan Job // channel with Job objects stop chan bool // channel with bool value } func newDispatcher(workerPool chan *worker, jobQueue chan Job) *dispatcher { d := &dispatcher{ workerPool: workerPool, jobQueue: jobQueue, stop: make(chan bool), } // create workers for i := 0; i < cap(d.workerPool); i++ { worker := newWorker(d.workerPool) worker.start() // at this point, worker is inserted into the workerPool } go d.dispatch() return d } func (d *dispatcher) dispatch() { for { select { case job := <-d.jobQueue: worker := <-d.workerPool worker.jobChannel <- job case stop := <-d.stop: if stop { for i := 0; i < cap(d.workerPool); i++ { worker := <-d.workerPool worker.stop <- true <-worker.stop } d.stop <- true return } } } } // Gorouting instance which can accept client jobs type worker struct { workerPool chan *worker jobChannel chan Job stop chan bool } func newWorker(pool chan *worker) *worker { // create a Job channel and stop channel return &worker{ workerPool: pool, jobChannel: make(chan Job), stop: make(chan bool), } } func (w *worker) start() { // new thread start go func() { var job Job for { // worker free, add it to pool w.workerPool <- w select { case job = <-w.jobChannel: job() case stop := <-w.stop: if stop { w.stop <- true return } } } }() }
댓글 없음:
댓글 쓰기