Tunny goroutine pool
Channels of Channels
- Effective Go > Channels
- Effective Go > Channels of channels
- go channels are bad and you should feel bad | jtolds.com
Source flow
- unbuffered channel은 sender 나 receiver 가 준비가 돼서 communicate 가 될 때까지 block 된다. (wrapper.readyChan)[ref. 2]
pool, _ := tunny.CreatePool(numCPUs, func(object interface{}) interface{} {
// this is job function
})
-->
// tunny.CreatePool
for i := range pool.workers {
newWorker := workerWrapper{
worker: &(tunnyDefaultWorker{&job}),
}
pool.workers[i] = &newWorker
}
-->
pool.Open()
-->
// pool.Open()
for i, workerWrapper := range pool.workers {
workerWrapper.Open()
// create cases for select
// case <- workerWrapper.readyChan :
pool.selects[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(workerWrapper.readyChan),
}
-->
// workerWrapper.Open()
go wrapper.Loop()
-->
// workerWrapper.Loop()
wrapper.readyChan <- 1
-->
// main()
result, _ := pool.SendWork(input)
-->
// WorkPool.SendWork()
/**
select{
case <--worker.readyChan : {
*/
if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
pool.workers[chosen].jobChan <- jobData
result, open := <-pool.workers[chosen].outputChan
if !open {
return nil, ErrWorkerClosed
}
return result, nil
}
-->
// workerWrapper.Loop()
wrapper.readyChan <- 1 // block until the select finishes processing the readyChan
for data := range wrapper.jobChan {
wrapper.outputChan <- wrapper.worker.TunnyJob(data)
for !wrapper.worker.TunnyReady() {
if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
break
}
time.Sleep(tout * time.Millisecond)
}
wrapper.readyChan <- 1
}
Full source
func main() {
numCPUs := runtime.NumCPU()
runtime.GOMAXPROCS(numCPUs + 1) // numCPUs hot threads + one for async tasks.
pool, _ := tunny.CreatePool(numCPUs, func(object interface{}) interface{} {
input, _ := object.([]byte)
// Do something that takes a lot of work
output := input
return output
}).Open()
defer pool.Close()
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
input, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Internal error", http.StatusInternalServerError)
}
// Send work to our pool
result, _ := pool.SendWork(input)
w.Write(result.([]byte))
})
http.ListenAndServe(":8080", nil)
}
func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool {
pool := WorkPool{running: 0}
pool.workers = make([]*workerWrapper, numWorkers)
for i := range pool.workers {
newWorker := workerWrapper{
worker: &(tunnyDefaultWorker{&job}),
}
pool.workers[i] = &newWorker
}
return &pool
}
//--------------------------------------------- WorkPool
func (pool *WorkPool) Open() (*WorkPool, error) {
pool.statusMutex.Lock()
defer pool.statusMutex.Unlock()
if !pool.isRunning() {
pool.selects = make([]reflect.SelectCase, len(pool.workers))
for i, workerWrapper := range pool.workers {
workerWrapper.Open()
// create cases for select
// case <- workerWrapper.readyChan :
pool.selects[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(workerWrapper.readyChan),
}
}
pool.setRunning(true)
return pool, nil
}
return nil, ErrPoolAlreadyRunning
}
func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) {
pool.statusMutex.RLock()
defer pool.statusMutex.RUnlock()
if pool.isRunning() {
/**
select{
case <--worker.readyChan : {
worker.jobChan <- jobData
result, open := <- worker.outputChan
if !open { return nil, ErrWorkerClosed }
return result, nil
}
}
return nil, ErrWorkerClosed
*/
if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
pool.workers[chosen].jobChan <- jobData
result, open := <-pool.workers[chosen].outputChan
if !open {
return nil, ErrWorkerClosed
}
return result, nil
}
return nil, ErrWorkerClosed
}
return nil, ErrPoolNotRunning
}
//--------------------------------------------- workerWrapper
func (wrapper *workerWrapper) Open() {
if extWorker, ok := wrapper.worker.(TunnyExtendedWorker); ok {
extWorker.TunnyInitialize()
}
wrapper.readyChan = make(chan int)
wrapper.jobChan = make(chan interface{})
wrapper.outputChan = make(chan interface{})
atomic.SwapUint32(&wrapper.poolOpen, uint32(1))
go wrapper.Loop()
}
func (wrapper *workerWrapper) Loop() {
// TODO: Configure?
tout := time.Duration(5)
for !wrapper.worker.TunnyReady() {
// It's sad that we can't simply check if jobChan is closed here.
if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
break
}
time.Sleep(tout * time.Millisecond)
}
wrapper.readyChan <- 1 // block until the select process the readyChan
for data := range wrapper.jobChan {
wrapper.outputChan <- wrapper.worker.TunnyJob(data)
for !wrapper.worker.TunnyReady() {
if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
break
}
time.Sleep(tout * time.Millisecond)
}
wrapper.readyChan <- 1
}
close(wrapper.readyChan)
close(wrapper.outputChan)
}
댓글 없음:
댓글 쓰기