python 3 에서 Future 의 동작 정리
python 3 의 Future 라는 녀석이 있다. 이녀석은 그냥 data 를 나르는 class 정도로 여기면 된다. c 에서 이야기하는 structure 같은 느낌으로 말이다.
그런데 python 의 code 에서 이녀석이 동작하는 방식이 library 안으로 들어가 있어서 언뜻 잘 이해가 안되다.
그래서 동작하는 순서대로 소스를 정리해봤다.
thread 와 future 의 flow
간단한 예제
간단한 아래 예제 소스를 한 번 보자.(출처 : Python: A quick introduction to the concurrent.futures module | Abu Ashraf Masnun)
아래 소스는 간단하다. pool.submit 을 하면 thread 가 만들어지고, thread 가 만들어지면서 future 를 return 해준다. 그러면 thread 가 동작을 하다가 동작을 완료하면 state 를 변경하고, future.set_result() 를 통해 결과를 future 에 넣어준다. 그러면 그 결과를 future.result() 를 통해 가져오게 되는 것이다.
future 를 share 해서 다른 thread 의 결과값을 가져온다고 생각하면 될 것 같다. 자세한 동작은 다음을 보자.
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ThreadPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print(future.result())
동작
아래 소스를 쭉 따라가보면 최종적으로 _WorkItem.run() 에서 self.future.set_result(result) 를 해서 future 에 result 를 넣어주게 된다.
그래서 우리는 이 future 로 현재 동작하고 있는 thread 의 결과를 쉽게 얻어오게 된다. 이것은 특별한 방법은 아니지만, worker thread 와 main thread 와의 data 전달을 하는 방법을 future 라는 것으로 규격화했다. 그래서 개념만 잘 익힌다면, programming 을 쉽게(?) 할 수 있다.
이런 pattern 이전에 썼던 방법들---queue 를 이용하던지, 아니면 공유되는 variable 을 사용하던지 하는 방법등---을 생각해 보면 이해가 쉬울 듯 하다.
# thread.py
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers):
...
self._work_queue = queue.Queue()
...
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
def _worker(executor_reference, work_queue):
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Notice other workers
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
# threading.py
class Thread:
...
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
...
self._target = target
...
def start(self):
...
_start_new_thread(self._bootstrap, ())
...
self._started.wait()
def _bootstrap(self):
try:
self._bootstrap_inner()
except:
if self._daemonic and _sys is None:
return
raise
def _bootstrap_inner(self):
try:
...
try:
self.run()
except SystemExit:
...
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
# thread.py
댓글 없음:
댓글 쓰기