Celery
Celery 는 분산메시지전달(distributed message passing) 을 기반으로 한 비동기 작업 큐(asynchronous task queue) 이다.[ref. 1]이 녀석은 하나의 일만 하는 서버로 보면 된다. 이렇게 하나의 일을 하는 thread 나 process 를 worker(일꾼) 라고 부르는데 이런 worker 의 하나라고 보면 된다.
이 일꾼은 일꾼으로서 완전한 동작을 하기 위해 아래 2가지 동작이 가능하다.
- 일을 전달 받는 법
- 어떤 일을 할 것인지 정의
일을 전달 받는 법
이 celery 가 일을 전달받는 방법은 queue 를 바라보는 것이다. 다만 queue 는 celery 가 가지고 있지 않다. RebbitMQ 등의 queue 를 따로 실행시켜서 사용해야 한다. Celery 에서는 이 queue 를 Broker 라고 부른다. 우리도 일을 중간에서 연결해주는 녀석을 broker(브로커) 라고 부르니 대충 의미를 짐작할 수 있을 것이다.그러면 이 broker 를 Celery 에게 알려주긴 해야 한다. 그래서 Celery 를 시작할 때 parameter 로 알려준다. 아니면 config file 에서 BROKER_URL 을 설정할 수도 있다. 참고로 default 값이 'amqp://' 이다.(참고)
app = Celery('tasks', broker='amqp://guest@localhost//')
어떤 일을 할 것인가.
이것은 Celery 를 이용한 coding 을 해주면 된다. RebbitMQ 를 먼저 설치하고 실행하자.(설치하면 기본적으로 실행된다.)pip install Celery
#test.py from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//') @app.task def add(x, y): return x + y
c:..> python >>> from test import add >>> add.delay(3,4) <AsyncResult: 716c4b58-e17b-4cad-8378-842c259c15cd> >>> r = add.delay(3,4) >>> r <AsyncResult: 131dff4a-e54e-43cd-a415-00cf69a006f5>
return value
Celery 로 수행한 일에 대한 return value 을 하고 싶으면, 이녀석의 return 결과를 어딘가에 잠깐 저장해 놔야 한다. 왜냐하면 이녀석은 asynchronous 하게 일을 처리하기 때문에 계속 자신의 return value 를 가지고 있지 않는다.config file
config 관련 자세한 설명은 여기에서 확인할 수 있다.여기서는 config 사용법을 볼 것이다. 일단 아래처럼 celeryconfig.py 를 만들자.
# celeryconfig.py ## Broker settings. BROKER_URL = 'amqp://guest:guest@localhost:5672//' # List of modules to import when celery starts. CELERY_IMPORTS = ('myapp.tasks', ) ## Using the database to store task state and results. CELERY_RESULT_BACKEND = 'db+sqlite:///results.db' CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
이것을 Celery instance 에 적용하기 위해서는 config_from_object() 를 이용하면 된다.
from celery import Celery celery = Celery('tasks') celery.config_from_object('celeryconfig')
command line
command line 에서 실행할 때는 --config option 을 사용하면 된다.>celery -A pworker worker -l info --config=celconfig.py
dedicated module
celery 부분을 하나의 module 로 작성할 수 있다. 여기서 자세한 사항을 확인할 수 있다. 아래 github 에 example 을 올려놨다.- github : https://github.com/i5on9i/celery_dedicated_module
- celery 설치 및 실행 예제 : 위의 github source 에 대한 글이다.
주의할 점
celery.py 가 있어야만 한다. 기본적으로 command line 에서 celery 를 실행하면 module 의 celery 를 import 하게 된다.Scheduling[ref. 5]
주기적인 작업을 위해 celery beat 를 제공한다. celery beat 이 scheduler 이다.CELERYBEAT_SCHEDULE 를 set 해서 주기를 설정할 수 있다. 이 설정을 DB 에 놓을 수도 있다고 한다.
celery beat 은 한개만 띄우라고 한다. scheduler 가 여러개라면 동기화의 문제등이 생기게 된다.
그리고 혹시 착각할 수 있어서 언급하면, celery beat 을 띄우고, celery worker 도 따로 띄워줘야 한다. 즉, celery beat 는 주기적으로 message 를 날리는 역할만 한다.
time zone
기본적으로 UTC time zone 을 사용한다. 이것은 CELERY_TIMEZONE 으로 설정을 변경할 수 있다. Django 를 사용하는 경우라면, Django 의 TIME_ZONE 설정이 사용된다. Celery 에 다른 time zone 을 적용하고 싶으면 CELERY_TIMEZONE 을 설정하면 된다.실행
command line 에서 실행할 때는 option 에 worker 대신에 beat 를 주면 된다.[ref. 5]>celery -A pworker beat -l info --config=celconfig.py (vir_mickey) c:\...\src>celery -A pworker beat -l info --config=celconfig.py celery beat v3.1.23 (Cipater) is starting. __ - ... __ - _ Configuration -> . broker -> amqp://guest:**@localhost:5672// . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> now (0s) [2016-07-21 16:26:39,206: INFO/MainProcess] beat: Starting... [2016-07-21 16:27:09,755: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add) [2016-07-21 16:27:45,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add) [2016-07-21 16:28:15,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add) [2016-07-21 16:28:45,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add) [2016-07-21 16:29:15,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add) [2016-07-21 16:29:45,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add)
schedule 저장
celery beat 는 기본적으로 celery.beat.PersistentScheduler 를 기본 scheduler 로 사용한다. 이 PersistentScheduler 가 마지막 실행시간들을(last run times) local database file에 기록한다.(default name 이 celerybeat-schedule 이다. shelve 를 참고하자.)그래서 현재 directory 에 대한 write 권한이 있어야 한다. 아니면 다른 곳을 지정해 줘야 한다. 이것은 -s option 으로 지정할 수 있다.
..>celery -A pworker beat -l info --config=celconfig.py -s .\mydb-scedule
djcelery.schedulers.DatabaseScheduler
django 에서 celery 를 사용하도록 해주는 django-celery 에서는 schedule 을 Django database 에 저장할 수 있도록 해주는 djcelery.schedulers.DatabaseScheduler 를 제공한다. 이 녀석을 사용하면 Django Admin 에서 periodic task 를 추가, 삭제, 수정을 할 수 있다.다른 scheduler 는 "-S option" 을 사용하거나 config file 에서 CELERYBEAT_SCHEDULER 를 이용해서 사용할 수 있다.
..>celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
결과값 저장
결과값을 저장하는 방법은 대체로 3개정도이다.(memcache, db, message queue)
아래글에서 DB를 사용할때의 장,단점, message queue 를 사용할 때의 장,단점등을 알려준다.
logging
celery worker 에 log 를 찍고 싶은 경우 아래를 참고하자.windows 에서 test
기본적으로 4.0 부터는 windows 에 대한 지원을 하지 않는다. (참고 ) 그러므로 3.1.24 를 이용해야 한다.DB 사용 task
- python - how to setup sqlalchemy session in celery tasks with no global variable - Stack Overflow
- prschmid: Using SQLAlchemy with Celery Tasks
# proj.db.py from sqlalchemy import create_engine engine = create_engine( 'sqlite:///:memory:', convert_unicode=True, pool_recycle=3600, pool_size=10)
from proj.db import engine from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker class DBTask(Task): _session = None def after_return(self, *args, **kwargs): if self._session is not None: self._session.remove() @property def session(self): if self._session is None: _, self._session = scoped_session( sessionmaker(autocommit=False, autoflush=False, bind=engine)) return self._session ... @app.task(base=DBTask, bind=True) def do_stuff_with_db(self, conf, some_arg): self.conf = conf thing = self.session.query(Thing).filter_by(arg=some_arg).first()
Django + Celery
ref. 6의 내용을 가지고 정리 해 본다. 여기서 django project 의 이름은 proj 이다.
- pip install django-celery
- proj/proj/celery.py 만들기
- proj/proj/__init__.py 에 import 추가하기
- settings.py 에 INSTALLED_APPS 에 'djcelery' 추가하기.
- djcelery 에 대한 db migrate
- 실행
- RabbitMQ 실행
- Celery worker 실행
- Celery beat 실행
- Django server 실행
pip install django-celery
이녀석은 굳이 설치하지 않아도 된다.(참고)
django-celery 를 설치하면, schedule 을 django db 에 저장해서 사용할 수 있고, backend 를 django db 로 사용할 수 있다. 이런 목적이 없다면 굳이 설치하지 않아도 된다.
..> pip install django-celery
proj/proj/celery.py 만들기
아래와 같이 celery.py 를 만들자.proj/proj/celery.py
from __future__ import absolute_import import os # 이녀석이 없으면 celery.py 와 library 의 celery.py 를 혼동한다. from __future__ import absolute_import # set the default Django settings module for the 'celery' program. # 이 부분으로 celery command line program 이 Django project 의 위치를 알게 된다. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') from django.conf import settings # noqa app = Celery('proj') # Using a string here means the worker will not have to # pickle the object when using Windows. # 이 부분으로 celery setting 을 해준다. app.config_from_object('django.conf:settings') # 원래 CELERY_IMPORT 에 설정해 주는데, autodiscover_tasks 를 사용하면 # 각 django app 에 tasks.py 로 celery task 들을 만들어 놓으면 알아서 찾는다. app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) # bind=True 를 이용하면, current instance (=self) 를 parameter 로 받을 수 있다. @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
proj/proj/__init__.py 에 import 추가
proj/proj/__init__.pyfrom __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa
settings.py 에 INSTALLED_APPS 에 'djcelery' 추가
INSTALLED_APPS( ... 'djcelery', ... )
djcelery 에 대한 db migrate
> python manage.py migrate djcelery
그러면 아래 같은 table 이 생긴다.
위의 db 를 사용한 backend 와 *schedule table 들을 이용하려면 아래처럼 설정해 주면 된다. 아래 설정은 proj/proj/settings.py 에 넣어주면 된다.
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
RabbitMQ 실행
windows 라면 service 에서 실행하면 된다. linux 라면 아래와 같이 하자.> sudo rabbitmq-server -detached
Celery worker 실행
> python manage.py celeryd --verbosity=2 --loglevel=DEBUG
Celery beat 실행
새로운 cmd 창을 하나 띄우고, 아래 처럼 celery beat 을 실행하자.> python manage.py celerybeat --verbosity=2 --loglevel=DEBUG
참고로 아래와 같이 띄울 수도 있다.
...proj>celery -A proj beat -l info
See Also
Reference
- First Steps with Celery — Celery 3.1.23 documentation
- Spoqa Tech Blog | Celery를 이용한 긴 작업 처리
- Celery를 이용한 분산처리 프로세스 작성하기 — Sunhyoup’s Story — Medium
- Celery Periodic Tasks: From Installation to Infinity | Metal Toad
- Periodic Tasks — Celery 3.1.23 documentation
- First steps with Django — Celery 3.1.23 documentation
- How to install Celery on Django and Create a Periodic Task
- Django - Celery - DB 사용 예제
댓글 없음:
댓글 쓰기