Celery 설치 및 실행
celery 를 한 번 사용해 보자.Windows
4.0 부터는 공식적으로 windows 지원을 하지 않는다.(참고)개인적으로 ValueError 때문에 pip install celery==3.1.24 를 설치했다.
python 3.7.3 에서는 celery 3.1.* 은 제대로 동작하지 않았다. celery==4.2.0 은 잘 동작한다. (참고)
RabbitMQ 설치 및 실행 :
- download : https://www.rabbitmq.com/download.html
- download 를 하면 자동으로 service 에 등록되고 실행된다.
- celery 설치
- pip install celery
- 예제 download 및 .bat 실행 :
- github : https://github.com/i5on9i/celery_dedicated_module
- Practices : 실제 어떻게 code 를 작성해서 사용하는지에 대한 내용
- unittest
- 이 소스 를 참고하자. python -m unittest discover test 를 하면 실행할 수 있다.
Linux
- RabbitMQ 설치 및 실행
- sudo apt-get install rabbitmq-server
- invoke-rc.d rabbitmq-server stop/start/etc
- celery 설치
- pip install celery
동작
celery beat 이 주기적(예제에서는 30초마다)으로 특정 task 를 실행 시키라고 RabbitMQ queue 에 message 를 보낸다. 그러면 celery worker 가 RabbitMQ 를 바라보고 있다고 queue 에 message 가 있으면 가져와서 task 를 실행시킨다.참고로, celery 가 run 하고 있어도, RabbitMQ 를 restart 할 수 있다. celery 가 RabbitMQ 를 주기적으로 connect 하기 때문에, RabbitMQ 가 다시 실행되면, 알아서 celery 가 RabbitMQ 에 connect 한다.
여러개의 worker 를 실행할 때
여러개의 worker 를 실행할 때는 각 worker 마다 다른 queue 를 둬야 한다. 이것을 자동으로 해줄 수도 있고, 직접 설정할 수도 있다. 자세한 내용은 아래를 참고하자.자동으로 설정하는 방법
- CELERY_ROUTES 로 task 가 어떤 queue 를 사용할 지 정해주고,
- 그 queue 를 사용하는 worker 를 띄우면 된다.
CELERY_ROUTES = {'pworker.tasks.import_feed': {'queue': 'feeds'}}
"feeds" 라는 이름의 queue 를 처리하는 worker 를 실행할 때
...> celery -A proj worker -Q feeds
예제 활용
위의 예제로 2개의 worker 와 1개의 beat 를 띄운다고 하면, 1개의 cmd 창을 추가로 뛰우고, 아래 명령어로 worker 를 추가해 주면 된다....> celery -A proj worker -Q feeds
여기서 feeds queue 로 message 를 보내기 위해서 CELERYBEAT_SCHEDULE 에 schedule 하나만 추가해 주면 된다.
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'pworker.tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16)
},
'add-every-20-seconds': {
'task': 'pworker.tasks.import_feed',
'schedule': timedelta(seconds=20),
'args': (16, 16)
},
}
Daemon 으로 실행
updated, 2020-08-08
sudo pip3 install celery==4.4.2
4.4.4 부터는 이유는 모르지만, init.d 로 celery 가 실행되지 않는다. 아마도 이제 init.d 를 이용해서 띄우는 것을 권장하지 않아서 별로 신경을 안쓰는 듯 하다.
아래 systemd 를 활용하는 법이 있다.
celery worker
linux 에서 background process 로 실행하는 것은 어렵지 않다. multi 를 사용하면 된다. 또는 celery worker --detach 로도 가능하다. 둘다 기본적으로 3개의 process 를 띄운다.django_proj$ celery multi start worker1 -A proj -Q feeds -l info --pidfile="./celery_worker.pid"
django_proj$ celery multi restart worker1 -A proj -Q feeds -l info --pidfile="./celery_worker.pid"
django_proj$ celery multi stopwait worker1 -A proj -Q feeds -l info --pidfile="./celery_worker.pid"
- celery.bin.multi — Celery 3.1.23 documentation : celery multi command 관련 examples
init service 등록
- 그 외에도 init service 로 등록할 수도 있는데, 이부분은 여기를 참고하자.
- 여기 서 celeryd file 을 받고, 이것을 /etc/init.d/ 에 넣어놓으면 된다. /etc/default/celeryd 가 configuration file 이다.
- /etc/default/celeryd 예제파일
celerybeat
celerybeat multi start ... --beat 로도 가능하다. 그런데 여러 process 가 뜬다. 이부분은 조정을 해야할 듯 한데, concurrency option 으로는 process 가 줄어들지 않는다. 이것 multi 가 기본적으로 여러개의 process 를 실행하기 위한 것이기 때문인 듯 하다.(참고.)
그래서 여기서는 init.d 에 등록하는 것을 이야기한다.
그래서 여기서는 init.d 에 등록하는 것을 이야기한다.
- celerybeat 을 /etc/init.d 에 copy (https://github.com/celery/celery/tree/master/extra/generic-init.d)
- celerybeat config file 을 /etc/default/celerybeat 에 copy
- sudo /etc/init.d/celerybeat {start|stop|restart}
/etc/init.d/celerybeat
configuration file
configuration file 을 하나 만들어야 한다. 경로는 아래 2가지중 하나를 택하면 된다. root 계정을 이용하자.- /etc/default/celerybeat
- /etc/default/celeryd : celerybeat 용 config 를 따로 두지 않고, celeryd 의 config 와 함께 사용할 수 있다.
BASE='/home/myuser' # For Django export DJANGO_SETTINGS_MODULE="settings" CELERYD_CHDIR="${BASE}/app/proj" # Absolute or relative path to the 'celery' command: CELERY_BIN="${BASE}/env/proj/bin/celery" # App instance to use # comment out this line if you don't use an app CELERY_APP="proj" # -A 에서 설정해준 proj 를 적으면 된다. # or fully qualified: #CELERY_APP="proj.tasks:app" # Where to chdir at start. CELERYBEAT_CHDIR="${BASE}/app/proj" # User to run beat as. Default is current user. CELERYBEAT_USER="myuser" CELERYBEAT_GROUP="myuser" CELERYBEAT_PID_FILE="${BASE}/var/run/celeryd.pid" # default is /var/run/celeryd.pid. CELERYBEAT_LOG_FILE="${BASE}/var/log/celery/proj/celeryd.log" # Extra arguments to celerybeat CELERYBEAT_OPTS="--schedule=${BASE}/var/run/celery/celerybeat-schedule"
위 config file 에서 만들어져 있지 않은 directory 가 있으면 새롭게 만들어준 후에 실행을 해야 한다.
위처럼 Django 를 이용하는 경우에 settings 에 그 밖의 설정을 넣어서 사용하면 된다.
systemd에서 사용
- Daemonization — Celery 4.2.0 documentation
- https://github.com/celery/celery/issues/4304 : celery beat 을 systemd 로
Celery email 설정
celery 에서 error 발생시 email 을 보내도록 설정할 수 있다. 이 설정도 django settings.py 안에 넣으면 된다.- Configuration and defaults — Celery 3.1.23 documentation : configuration 예제를 볼 수 있다.
위의 configuration 예제에서 django settings 에 이미 설정이 되어 있는 녀석은 다시 설정하지 않아도 된다.
Celery email 설정 - app.task
app.task 에 error_whitelist=[] 를 arg 로 줘야 한다.(참고 : Error emails never sent · Issue #931 · celery/celery) 그렇지 않으면 error 가 발생해도 email 이 날아가지 않았다.
@app.task(error_whitelist=[]) def testEmail(): raise Exception('namh test')
실행
실행은 아래 command 를 이용하면 된다.sudo /etc/init.d/celerybeat {start|stop|restart}
myuser$ sudo /etc/init.d/celerybeat start celery init v10.1. Using configuration: , /etc/default/celerybeat Starting celerybeat... myuser$ ps aux | grep cel rootbel+ 15316 0.3 2.0 83960 20884 ? S 14:58 0:00 /home/myuser/env/beluga/bin/python /home/myuser/env/beluga/bin/celery beat --schedule=/home/myuser/var/run/celery/celerybeat-schedule -f /home/myuser/var/log/celery/beluga/celeryd.log -l INFO --workdir=/home/myuser/app/beluga --detach --pidfile=/home/myuser/var/run/celeryd.pid
자세한 내용은 여기를 참고하자.
일정 시간동안 stop 후 다시 실행할 때
celery beat 를 한참 꺼 놨다가 다시 시작할 때 주의할 점은 celery_beat 가 이전에 해야했던 작업(task) 에 대한 signal 를 보낸다는 것이다.celery beat 가 celerybeat-schedule 에 마지막에 어느부분까지 수행했는지를 저장해 놓기 때문에
- --schedule=${BASE}/var/run/celery/celerybeat-schedule
그러니 celery beat 가 멈춰있던 시간에 대한 일을 수행하길 원하지 않는다면, 다시 시작하기 전에 celerybeat-schedule 파일을 지우고 다시 시작하자.
Django + Celery
Django 와 함께 celery 를 사용해 보자. 일단 간단하게 위의 예제에서 celconfig.py 를 config 로 사용했지만 이것을 djanngo settings.py 를 이용하도록 해보자.절차
순서는 아래와 같다. django 의 project 이름을 'proj' 라고 하자.- pip install celery
- proj/proj/celery.py 추가 : pworker 대신에 proj 로 수정
- celconfig.py 내용을 proj/settings module 에 copy
- proj/myapp/tasks.py 추가
- 실행
- RabbitMQ 실행
- proj> celery -A proj worker -l info
- proj> celery -A proj beat -l info
pip install celery
celery 를 설치하자.proj/proj/celery.py 추가
"celery -A proj worker" 를 실행하게 되면 celery.py 를 주기적으로 수행하게 된다.(__main__ 을 실행하지는 않느다.) 그래서 proj/proj/celery.py 를 넣어줘야 한다.# coding=utf-8 from __future__ import absolute_import from celery import Celery # djano 에서 쓰일 setting 지정 아래의 경우 proj/settings.py 를 사용한다는 뜻 import os os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') from django.conf import settings # noqa # app = Celery('proj', # broker='amqp://guest@localhost//', # backend='amqp://', # include=['proj.tasks']) app = Celery('proj') # Optional configuration, see the application user guide.
# django.conf:settings 로 django setting 을 celery 의 config 로 불러온다. app.config_from_object('django.conf:settings') # INSTALLED_APPS 안에 있는 tasks.py 들을 알아서 import 해 준다. app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.conf.update( CELERY_TASK_RESULT_EXPIRES=3600, ) if __name__ == '__main__': app.start()
command line 에서 settings 설정
command line 에서 settings 를 결정하려면 아래처럼 setting 변수를 설정해 주면 된다.windows
SET DJANGO_SETTINGS_MODULE=proj.settings.local
linux
export DJANGO_SETTINGS_MODULE=proj.settings.remote
proj/myapp/tasks.py 추가
위에서 "app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) " 를 사용하기 때문에 tasks.py 가 INSTALLED_APPS 아래 있어야 한다.여기서는 myapp 이라는 application 에 넣는다고 가정한다. 주의할 점은 경로이다. app 의 path 를 알맞게 변경해 주자.
proj/myapp/tasks.py
# coding=utf-8
from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
celconfig.py 내용을 proj/settings module 에 copy
위에서 지정한 "proj.settings" (proj/proj/settings.py) 에 celconfig.py 내용을 옮겨 놓자. 이때 task 등의 path 만 제대로 설정하자.그리고 CELERY_IMPORTS 부분만 삭제하자.
# #### Celery CONFIGURATION ## Broker settings. BROKER_URL = 'amqp://guest:guest@localhost//' # List of modules to import when celery starts. # 아래 부분은 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 때문에 필요없다. # CELERY_IMPORTS = ('myapp.tasks', ) ## Using the database to store task state and results. CELERY_RESULT_BACKEND = 'amqp://' CELERY_ANNOTATIONS = {'myapp.tasks.add': {'rate_limit': '10/s'}} from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'myapp.tasks.add', 'schedule': timedelta(seconds=30), 'args': (16, 16) }, } # #### END Celery CONFIGURATION
crontab
위의 CELERYBEAT_SCHEDULE 에서 timedelta 대신에 crontab 도 사용할 수 있다. 이 때 주의할 점은 TIME_ZONE 이다.default 값은 UTC 로 되어 있다. 특별히 지정하고 싶으면 아래처럼 하면 된다.
CELERY_TIMEZONE = 'Europe/London'
만약 Django 를 사용하면, 기본적으로 Django 의 TIME_ZONE 이 사용된다. 하지만, CELERY_TIMEZONE 을 이용해서 celery 의 timezone 을 따로 설정할 수 있다.
TIME_ZONE 이 재설정되고 나서 celery 가 자동으로 TIME_ZONE 을 변경하지 않는다. 그때는 아래 command 를 사용하면 된다.(또는 재시작을 해도 된다.)
$ python manage.py shell >>> from djcelery.models import PeriodicTask >>> PeriodicTask.objects.update(last_run_at=None)
from celery.schedules import crontab CELERYBEAT_SCHEDULE = { # celery's default Time Zone is UTC # :see http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html # Executes every weekday at 15:30(6:30+0:00) 'add-every-day-after-martket-close': { 'task': 'mfactors.tasks.kospiK200WdateUpdater', 'schedule': crontab(hour=6, minute=30, day_of_week='mon,tue,wed,thu,fri'), # 'args': (16, 16), }, }
실행
project folder 에서 아래처럼 실행하면 된다.worker
proj> celery -A proj worker -l info
beat
proj> celery -A proj beat -l info
django-celery
django-celery 를 설치하면, schedule 을 django db 에 저장해서 사용할 수 있고, backend 를 django db 로 사용할 수 있다. 이런 목적이 없다면 굳이 설치하지 않아도 된다.(참고)
..> pip install django-celery
django-celery 를 설치해서 사용한 예는 ref. 1에서 확인할 수 있다
덕분에 환경설정 잘 했습니다. 감사합니다.
답글삭제