python 3 에서 Future 의 동작 정리
python 3 의 Future 라는 녀석이 있다. 이녀석은 그냥 data 를 나르는 class 정도로 여기면 된다. c 에서 이야기하는 structure 같은 느낌으로 말이다.
그런데 python 의 code 에서 이녀석이 동작하는 방식이 library 안으로 들어가 있어서 언뜻 잘 이해가 안되다.
그래서 동작하는 순서대로 소스를 정리해봤다.
thread 와 future 의 flow
간단한 예제
간단한 아래 예제 소스를 한 번 보자.(출처 : Python: A quick introduction to the concurrent.futures module | Abu Ashraf Masnun)
아래 소스는 간단하다. pool.submit 을 하면 thread 가 만들어지고, thread 가 만들어지면서 future 를 return 해준다. 그러면 thread 가 동작을 하다가 동작을 완료하면 state 를 변경하고, future.set_result() 를 통해 결과를 future 에 넣어준다. 그러면 그 결과를 future.result() 를 통해 가져오게 되는 것이다.
future 를 share 해서 다른 thread 의 결과값을 가져온다고 생각하면 될 것 같다. 자세한 동작은 다음을 보자.
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ThreadPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())
동작
아래 소스를 쭉 따라가보면 최종적으로 _WorkItem.run() 에서 self.future.set_result(result) 를 해서 future 에 result 를 넣어주게 된다.
그래서 우리는 이 future 로 현재 동작하고 있는 thread 의 결과를 쉽게 얻어오게 된다. 이것은 특별한 방법은 아니지만, worker thread 와 main thread 와의 data 전달을 하는 방법을 future 라는 것으로 규격화했다. 그래서 개념만 잘 익힌다면, programming 을 쉽게(?) 할 수 있다.
이런 pattern 이전에 썼던 방법들---queue 를 이용하던지, 아니면 공유되는 variable 을 사용하던지 하는 방법등---을 생각해 보면 이해가 쉬울 듯 하다.
# thread.py class ThreadPoolExecutor(_base.Executor): def __init__(self, max_workers): ... self._work_queue = queue.Queue() ... def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f def _adjust_thread_count(self): # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn self.args = args self.kwargs = kwargs def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException as e: self.future.set_exception(e) else: self.future.set_result(result) def _worker(executor_reference, work_queue): try: while True: work_item = work_queue.get(block=True) if work_item is not None: work_item.run() # Delete references to object. See issue16284 del work_item continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Notice other workers work_queue.put(None) return del executor except BaseException: _base.LOGGER.critical('Exception in worker', exc_info=True) # threading.py class Thread: ... def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): ... self._target = target ... def start(self): ... _start_new_thread(self._bootstrap, ()) ... self._started.wait() def _bootstrap(self): try: self._bootstrap_inner() except: if self._daemonic and _sys is None: return raise def _bootstrap_inner(self): try: ... try: self.run() except SystemExit: ... def run(self): try: if self._target: self._target(*self._args, **self._kwargs) finally: del self._target, self._args, self._kwargs # thread.py
댓글 없음:
댓글 쓰기