Tunny goroutine pool
Channels of Channels
- Effective Go > Channels
- Effective Go > Channels of channels
- go channels are bad and you should feel bad | jtolds.com
Source flow
- unbuffered channel은 sender 나 receiver 가 준비가 돼서 communicate 가 될 때까지 block 된다. (wrapper.readyChan)[ref. 2]
pool, _ := tunny.CreatePool(numCPUs, func(object interface{}) interface{} { // this is job function }) --> // tunny.CreatePool for i := range pool.workers { newWorker := workerWrapper{ worker: &(tunnyDefaultWorker{&job}), } pool.workers[i] = &newWorker } --> pool.Open() --> // pool.Open() for i, workerWrapper := range pool.workers { workerWrapper.Open() // create cases for select // case <- workerWrapper.readyChan : pool.selects[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(workerWrapper.readyChan), } --> // workerWrapper.Open() go wrapper.Loop() --> // workerWrapper.Loop() wrapper.readyChan <- 1 --> // main() result, _ := pool.SendWork(input) --> // WorkPool.SendWork() /** select{ case <--worker.readyChan : { */ if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 { pool.workers[chosen].jobChan <- jobData result, open := <-pool.workers[chosen].outputChan if !open { return nil, ErrWorkerClosed } return result, nil } --> // workerWrapper.Loop() wrapper.readyChan <- 1 // block until the select finishes processing the readyChan for data := range wrapper.jobChan { wrapper.outputChan <- wrapper.worker.TunnyJob(data) for !wrapper.worker.TunnyReady() { if atomic.LoadUint32(&wrapper.poolOpen) == 0 { break } time.Sleep(tout * time.Millisecond) } wrapper.readyChan <- 1 }
Full source
func main() { numCPUs := runtime.NumCPU() runtime.GOMAXPROCS(numCPUs + 1) // numCPUs hot threads + one for async tasks. pool, _ := tunny.CreatePool(numCPUs, func(object interface{}) interface{} { input, _ := object.([]byte) // Do something that takes a lot of work output := input return output }).Open() defer pool.Close() http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { input, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, "Internal error", http.StatusInternalServerError) } // Send work to our pool result, _ := pool.SendWork(input) w.Write(result.([]byte)) }) http.ListenAndServe(":8080", nil) } func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool { pool := WorkPool{running: 0} pool.workers = make([]*workerWrapper, numWorkers) for i := range pool.workers { newWorker := workerWrapper{ worker: &(tunnyDefaultWorker{&job}), } pool.workers[i] = &newWorker } return &pool } //--------------------------------------------- WorkPool func (pool *WorkPool) Open() (*WorkPool, error) { pool.statusMutex.Lock() defer pool.statusMutex.Unlock() if !pool.isRunning() { pool.selects = make([]reflect.SelectCase, len(pool.workers)) for i, workerWrapper := range pool.workers { workerWrapper.Open() // create cases for select // case <- workerWrapper.readyChan : pool.selects[i] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(workerWrapper.readyChan), } } pool.setRunning(true) return pool, nil } return nil, ErrPoolAlreadyRunning } func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) { pool.statusMutex.RLock() defer pool.statusMutex.RUnlock() if pool.isRunning() { /** select{ case <--worker.readyChan : { worker.jobChan <- jobData result, open := <- worker.outputChan if !open { return nil, ErrWorkerClosed } return result, nil } } return nil, ErrWorkerClosed */ if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 { pool.workers[chosen].jobChan <- jobData result, open := <-pool.workers[chosen].outputChan if !open { return nil, ErrWorkerClosed } return result, nil } return nil, ErrWorkerClosed } return nil, ErrPoolNotRunning } //--------------------------------------------- workerWrapper func (wrapper *workerWrapper) Open() { if extWorker, ok := wrapper.worker.(TunnyExtendedWorker); ok { extWorker.TunnyInitialize() } wrapper.readyChan = make(chan int) wrapper.jobChan = make(chan interface{}) wrapper.outputChan = make(chan interface{}) atomic.SwapUint32(&wrapper.poolOpen, uint32(1)) go wrapper.Loop() } func (wrapper *workerWrapper) Loop() { // TODO: Configure? tout := time.Duration(5) for !wrapper.worker.TunnyReady() { // It's sad that we can't simply check if jobChan is closed here. if atomic.LoadUint32(&wrapper.poolOpen) == 0 { break } time.Sleep(tout * time.Millisecond) } wrapper.readyChan <- 1 // block until the select process the readyChan for data := range wrapper.jobChan { wrapper.outputChan <- wrapper.worker.TunnyJob(data) for !wrapper.worker.TunnyReady() { if atomic.LoadUint32(&wrapper.poolOpen) == 0 { break } time.Sleep(tout * time.Millisecond) } wrapper.readyChan <- 1 } close(wrapper.readyChan) close(wrapper.outputChan) }
댓글 없음:
댓글 쓰기