[컴][go] tunny goroutine 동작

thread pool / goroutine pool /routine pool 분석 / thread pool 분석 / 구현



Tunny goroutine pool




Channels of Channels




Source flow

  • unbuffered channel은 sender 나 receiver 가 준비가 돼서 communicate 가 될 때까지 block 된다. (wrapper.readyChan)[ref. 2]

pool, _ := tunny.CreatePool(numCPUs, func(object interface{}) interface{} {
    // this is job function
})

--> 
// tunny.CreatePool
for i := range pool.workers {
    newWorker := workerWrapper{
        worker: &(tunnyDefaultWorker{&job}),
    }
    pool.workers[i] = &newWorker
}

-->
pool.Open()
-->
// pool.Open()
for i, workerWrapper := range pool.workers {
    workerWrapper.Open()

    // create cases for select
    // case <- workerWrapper.readyChan : 
    pool.selects[i] = reflect.SelectCase{
        Dir:  reflect.SelectRecv,
        Chan: reflect.ValueOf(workerWrapper.readyChan),
    }
-->
// workerWrapper.Open()
go wrapper.Loop()
-->
// workerWrapper.Loop()
wrapper.readyChan <- 1

-->
// main()
result, _ := pool.SendWork(input)

-->
// WorkPool.SendWork()
/**
  select{ 
    case <--worker.readyChan : {
*/
if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
    pool.workers[chosen].jobChan <- jobData
    result, open := <-pool.workers[chosen].outputChan

    if !open {
        return nil, ErrWorkerClosed
    }
    return result, nil
}

-->
// workerWrapper.Loop()
wrapper.readyChan <- 1 // block until the select finishes processing the readyChan

for data := range wrapper.jobChan {
    wrapper.outputChan <- wrapper.worker.TunnyJob(data)
    for !wrapper.worker.TunnyReady() {
        if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
            break
        }
        time.Sleep(tout * time.Millisecond)
    }
    wrapper.readyChan <- 1
}


Full source

func main() {
    numCPUs := runtime.NumCPU()
    runtime.GOMAXPROCS(numCPUs + 1) // numCPUs hot threads + one for async tasks.

    pool, _ := tunny.CreatePool(numCPUs, func(object interface{}) interface{} {
        input, _ := object.([]byte)

        // Do something that takes a lot of work
        output := input

        return output
    }).Open()

    defer pool.Close()

    http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
        input, err := ioutil.ReadAll(r.Body)
        if err != nil {
            http.Error(w, "Internal error", http.StatusInternalServerError)
        }

        // Send work to our pool
        result, _ := pool.SendWork(input)

        w.Write(result.([]byte))
    })

    http.ListenAndServe(":8080", nil)
}



func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool {
    pool := WorkPool{running: 0}

    pool.workers = make([]*workerWrapper, numWorkers)
    for i := range pool.workers {
        newWorker := workerWrapper{
            worker: &(tunnyDefaultWorker{&job}),
        }
        pool.workers[i] = &newWorker
    }

    return &pool
}

//--------------------------------------------- WorkPool

func (pool *WorkPool) Open() (*WorkPool, error) {
    pool.statusMutex.Lock()
    defer pool.statusMutex.Unlock()

    if !pool.isRunning() {

        pool.selects = make([]reflect.SelectCase, len(pool.workers))

        for i, workerWrapper := range pool.workers {
            workerWrapper.Open()

            // create cases for select
            // case <- workerWrapper.readyChan : 
            pool.selects[i] = reflect.SelectCase{
                Dir:  reflect.SelectRecv,
                Chan: reflect.ValueOf(workerWrapper.readyChan),
            }
        }

        pool.setRunning(true)
        return pool, nil

    }
    return nil, ErrPoolAlreadyRunning
}

func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) {
    pool.statusMutex.RLock()
    defer pool.statusMutex.RUnlock()

    if pool.isRunning() {
        /**
          select{ 
            case <--worker.readyChan : {
              worker.jobChan <- jobData
              result, open := <- worker.outputChan
              if !open { return nil, ErrWorkerClosed }
              return result, nil
            } 
          }
          return nil, ErrWorkerClosed
        */
        if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
            pool.workers[chosen].jobChan <- jobData
            result, open := <-pool.workers[chosen].outputChan

            if !open {
                return nil, ErrWorkerClosed
            }
            return result, nil
        }
        return nil, ErrWorkerClosed
    }
    return nil, ErrPoolNotRunning
}

//--------------------------------------------- workerWrapper

func (wrapper *workerWrapper) Open() {
    if extWorker, ok := wrapper.worker.(TunnyExtendedWorker); ok {
        extWorker.TunnyInitialize()
    }

    wrapper.readyChan = make(chan int)
    wrapper.jobChan = make(chan interface{})
    wrapper.outputChan = make(chan interface{})

    atomic.SwapUint32(&wrapper.poolOpen, uint32(1))

    go wrapper.Loop()
}

func (wrapper *workerWrapper) Loop() {

    // TODO: Configure?
    tout := time.Duration(5)

    for !wrapper.worker.TunnyReady() {
        // It's sad that we can't simply check if jobChan is closed here.
        if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
            break
        }
        time.Sleep(tout * time.Millisecond)
    }

    wrapper.readyChan <- 1 // block until the select process the readyChan

    for data := range wrapper.jobChan {
        wrapper.outputChan <- wrapper.worker.TunnyJob(data)
        for !wrapper.worker.TunnyReady() {
            if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
                break
            }
            time.Sleep(tout * time.Millisecond)
        }
        wrapper.readyChan <- 1
    }

    close(wrapper.readyChan)
    close(wrapper.outputChan)

}






References

  1. Jeffail/tunny: A goroutine pool for golang
  2. Golang channels tutorial | Alexander Guz's blog






댓글 없음:

댓글 쓰기