[컴] celery 설치 및 실행 예제 / django + celery

django 에서 celery 사용하기 / django 와 celery 사용 / 장고 셀러리



Celery 설치 및 실행

celery 를 한 번 사용해 보자.

Windows

4.0 부터는 공식적으로 windows 지원을 하지 않는다.(참고)
개인적으로 ValueError 때문에 pip install celery==3.1.24 를 설치했다.

RabbitMQ 설치 및 실행 :
    1. download : https://www.rabbitmq.com/download.html
    2. download 를 하면 자동으로 service 에 등록되고 실행된다.
  1. celery 설치
    1. pip install celery
    2. 예제 download 및 .bat 실행 :
      1. github : https://github.com/i5on9i/celery_dedicated_module

Linux

  1. RabbitMQ 설치 및 실행
    1. sudo apt-get install rabbitmq-server
    2. invoke-rc.d rabbitmq-server stop/start/etc
  2. celery 설치
    1. pip install celery



동작

celery beat 이 주기적(예제에서는 30초마다)으로 특정 task 를 실행 시키라고 RabbitMQ queue 에 message 를 보낸다. 그러면 celery worker 가 RabbitMQ 를 바라보고 있다고 queue 에 message 가 있으면 가져와서 task 를 실행시킨다.

참고로, celery 가 run 하고 있어도, RabbitMQ 를 restart 할 수 있다. celery 가 RabbitMQ 를 주기적으로 connect 하기 때문에, RabbitMQ 가 다시 실행되면, 알아서 celery 가 RabbitMQ 에 connect 한다.











Django + Celery

Django 와 함께 celery 를 사용해 보자. 일단 간단하게 위의 예제에서 celconfig.py 를 config 로 사용했지만 이것을 djanngo settings.py 를 이용하도록 해보자.

절차

순서는 아래와 같다. django 의 project 이름을 'proj' 라고 하자.
  1. pip install celery
  2. proj/proj/celery.py 추가 : pworker 대신에 proj 로 수정
  3. celconfig.py 내용을 proj/settings module 에  copy
  4. proj/myapp/tasks.py 추가
  5. 실행
    1. RabbitMQ 실행
    2. proj> celery -A proj worker -l info
    3. proj> celery -A proj beat -l info


pip install celery

celery 를 설치하자.

proj/proj/celery.py 추가


"celery -A proj worker" 를 실행하게 되면 celery.py 를 주기적으로 수행하게 된다.(__main__ 을 실행하지는 않느다.) 그래서 proj/proj/celery.py 를 넣어줘야 한다.

# coding=utf-8

from __future__ import absolute_import

from celery import Celery


# djano 에서 쓰일 setting 지정 아래의 경우 proj/settings.py 를 사용한다는 뜻
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings  # noqa

# app = Celery('proj',
#              broker='amqp://guest@localhost//',
#              backend='amqp://',
#              include=['proj.tasks'])

app = Celery('proj')

# Optional configuration, see the application user guide.
# django.conf:settings 로 django setting 을 celery 의 config 로 불러온다.
app.config_from_object('django.conf:settings')
# INSTALLED_APPS 안에 있는 tasks.py 들을 알아서 import 해 준다.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)

if __name__ == '__main__':
    app.start()


command line 에서 settings 설정

command line 에서 settings 를 결정하려면 아래처럼 setting 변수를 설정해 주면 된다.

windows
SET DJANGO_SETTINGS_MODULE=proj.settings.local

linux
export DJANGO_SETTINGS_MODULE=proj.settings.remote



proj/myapp/tasks.py 추가

위에서 "app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) " 를 사용하기 때문에 tasks.py 가 INSTALLED_APPS 아래 있어야 한다.

여기서는 myapp 이라는 application 에 넣는다고 가정한다. 주의할 점은 경로이다. app 의 path 를 알맞게 변경해 주자.

proj/myapp/tasks.py
# coding=utf-8

from __future__ import absolute_import
from proj.celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)



celconfig.py 내용을 proj/settings module 에  copy

위에서 지정한 "proj.settings" (proj/proj/settings.py) 에 celconfig.py 내용을 옮겨 놓자. 이때 task 등의 path 만 제대로 설정하자.

그리고 CELERY_IMPORTS 부분만 삭제하자.


# #### Celery CONFIGURATION
## Broker settings.
BROKER_URL = 'amqp://guest:guest@localhost//'

# List of modules to import when celery starts.
# 아래 부분은 app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 때문에 필요없다.
# CELERY_IMPORTS = ('myapp.tasks', )

## Using the database to store task state and results.
CELERY_RESULT_BACKEND = 'amqp://'

CELERY_ANNOTATIONS = {'myapp.tasks.add': {'rate_limit': '10/s'}}

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'myapp.tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (16, 16)
    },
}
# #### END Celery CONFIGURATION


crontab

위의 CELERYBEAT_SCHEDULE 에서 timedelta 대신에 crontab 도 사용할 수 있다. 이 때 주의할 점은 TIME_ZONE 이다.
default 값은 UTC 로 되어 있다. 특별히 지정하고 싶으면 아래처럼 하면 된다.
CELERY_TIMEZONE = 'Europe/London'

만약 Django 를 사용하면, 기본적으로 Django 의 TIME_ZONE 이 사용된다. 하지만, CELERY_TIMEZONE 을 이용해서 celery 의 timezone 을 따로 설정할 수 있다.

TIME_ZONE 이 재설정되고 나서 celery 가 자동으로 TIME_ZONE 을 변경하지 않는다. 그때는 아래 command 를 사용하면 된다.(또는 재시작을 해도 된다.)
$ python manage.py shell
>>> from djcelery.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)


from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    # celery's default Time Zone is UTC
    # :see http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
    # Executes every weekday at 15:30(6:30+0:00)
    'add-every-day-after-martket-close': {
        'task': 'mfactors.tasks.kospiK200WdateUpdater',
        'schedule': crontab(hour=6, minute=30, day_of_week='mon,tue,wed,thu,fri'),
        # 'args': (16, 16),
    },
}






실행

project folder 에서 아래처럼 실행하면 된다.

worker
proj> celery -A proj worker -l info

beat
proj> celery -A proj beat -l info



django-celery

django-celery 를 설치하면, schedule 을 django db 에 저장해서 사용할 수 있고, backend 를 django db 로 사용할 수 있다. 이런 목적이 없다면 굳이 설치하지 않아도 된다.(참고

..> pip install django-celery

django-celery 를 설치해서 사용한 예는 ref. 1에서 확인할 수 있다





여러개의 worker 를 실행할 때

여러개의 worker 를 실행할 때는 각 worker 마다 다른 queue 를 둬야 한다. 이것을 자동으로 해줄 수도 있고, 직접 설정할 수도 있다. 자세한 내용은 아래를 참고하자.


자동으로 설정하는 방법

  1. CELERY_ROUTES 로 task 가 어떤 queue 를 사용할 지 정해주고,
  2. 그 queue 를 사용하는 worker 를 띄우면 된다.

import_feed 라는 task 는 "feeds" 라는 queue 에서 message 를 받아서 처리하도록 한다.
CELERY_ROUTES = {'pworker.tasks.import_feed': {'queue': 'feeds'}}


"feeds" 라는 이름의 queue 를 처리하는 worker 를 실행할 때
...> celery -A proj worker -Q feeds


예제 활용

위의 예제로 2개의 worker 와 1개의 beat 를 띄운다고 하면, 1개의 cmd 창을 추가로 뛰우고, 아래 명령어로 worker 를 추가해 주면 된다.
...> celery -A proj worker -Q feeds

여기서 feeds queue 로 message 를 보내기 위해서 CELERYBEAT_SCHEDULE 에 schedule 하나만 추가해 주면 된다.
CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'pworker.tasks.add',
        'schedule': timedelta(seconds=30),
        'args': (16, 16)
    },
    'add-every-20-seconds': {
        'task': 'pworker.tasks.import_feed',
        'schedule': timedelta(seconds=20),
        'args': (16, 16)
    },
}



Daemon 으로 실행

celery worker

linux 에서 background process 로 실행하는 것은 어렵지 않다. multi 를 사용하면 된다. 또는 celery worker --detach 로도 가능하다. 둘다 기본적으로 3개의 process 를 띄운다.

django_proj$ celery multi start worker1 -A proj -Q feeds -l info --pidfile="./celery_worker.pid"

django_proj$ celery multi restart worker1 -A proj -Q feeds -l info --pidfile="./celery_worker.pid"

django_proj$ celery multi stopwait worker1 -A proj -Q feeds -l info --pidfile="./celery_worker.pid"

그 외에도 init service 로 등록할 수도 있는데, 이부분은 여기를 참고하자.


celerybeat

celerybeat multi start ... --beat 로도 가능하다. 그런데 여러 process 가 뜬다. 이부분은 조정을 해야할 듯 한데, concurrency option 으로는 process 가 줄어들지 않는다. 이것 multi 가 기본적으로 여러개의 process 를 실행하기 위한 것이기 때문인 듯 하다.(참고.)

그래서 여기서는 init.d 에 등록하는 것을 이야기한다.
  1. celerybeat 을 /etc/init.d 에 copy (https://github.com/celery/celery/tree/master/extra/generic-init.d)
  2. celerybeat config file 을 /etc/default/celerybeat 에 copy
  3. sudo /etc/init.d/celerybeat {start|stop|restart}

/etc/init.d/celerybeat
celerybeat 를 /etc/init.d 에 넣어놓자. root 계정을 이용하면 된다.


configuration file
configuration file 을 하나 만들어야 한다. 경로는 아래 2가지중 하나를 택하면 된다. root 계정을 이용하자.
  • /etc/default/celerybeat
  • /etc/default/celeryd


BASE='/home/myuser'

# For Django
export DJANGO_SETTINGS_MODULE="settings"
CELERYD_CHDIR="${BASE}/app/proj"

# Absolute or relative path to the 'celery' command:
CELERY_BIN="${BASE}/env/proj/bin/celery"
 
# App instance to use
# comment out this line if you don't use an app
CELERY_APP="proj"  # -A 에서 설정해준 proj 를 적으면 된다.
# or fully qualified:
#CELERY_APP="proj.tasks:app"

# Where to chdir at start.
CELERYBEAT_CHDIR="${BASE}/app/proj"

# User to run beat as. Default is current user.
CELERYBEAT_USER="myuser"
CELERYBEAT_GROUP="myuser"

CELERYBEAT_PID_FILE="${BASE}/var/run/celeryd.pid" # default is /var/run/celeryd.pid.
CELERYBEAT_LOG_FILE="${BASE}/var/log/celery/proj/celeryd.log"

# Extra arguments to celerybeat
CELERYBEAT_OPTS="--schedule=${BASE}/var/run/celery/celerybeat-schedule"

위 config file 에서 만들어져 있지 않은 directory 가 있으면 새롭게 만들어준 후에 실행을 해야 한다.

위처럼 Django 를 이용하는 경우에 settings 에 그 밖의 설정을 넣어서 사용하면 된다.

systemd에서 사용

Celery email 설정
celery 에서 error 발생시 email 을 보내도록 설정할 수 있다. 이 설정도 django settings.py 안에 넣으면 된다.
위의 configuration 예제에서 django settings 에 이미 설정이 되어 있는 녀석은 다시 설정하지 않아도 된다.

Celery email 설정 - app.task
app.task 에 error_whitelist=[] 를 arg 로 줘야 한다.(참고 : Error emails never sent · Issue #931 · celery/celery) 그렇지 않으면 error 가 발생해도 email 이 날아가지 않았다.
@app.task(error_whitelist=[])
def testEmail():
        raise Exception('namh test')


실행
실행은 아래 command 를 이용하면 된다.
sudo /etc/init.d/celerybeat {start|stop|restart}

myuser$ sudo /etc/init.d/celerybeat start
celery init v10.1.
Using configuration: , /etc/default/celerybeat
Starting celerybeat...

myuser$ ps aux | grep cel
rootbel+ 15316  0.3  2.0  83960 20884 ?        S    14:58   0:00 /home/myuser/env/beluga/bin/python /home/myuser/env/beluga/bin/celery beat --schedule=/home/myuser/var/run/celery/celerybeat-schedule -f /home/myuser/var/log/celery/beluga/celeryd.log -l INFO --workdir=/home/myuser/app/beluga --detach --pidfile=/home/myuser/var/run/celeryd.pid


자세한 내용은 여기를 참고하자.



일정 시간동안 stop 후 다시 실행할 때
celery beat 를 한참 꺼 놨다가 다시 시작할 때 주의할 점은 celery_beat 가 이전에 해야했던 작업(task) 에 대한 signal 를 보낸다는 것이다.

celery beat 가 celerybeat-schedule 에 마지막에 어느부분까지 수행했는지를 저장해 놓기 때문에

  • --schedule=${BASE}/var/run/celery/celerybeat-schedule

한동안 꺼져있다가 다시 celery beat 를 실행하면, 마지막 수행시점부터 현재시점까지 해야했던 일들을 하게 된다.

그러니 celery beat 가 멈춰있던 시간에 대한 일을 수행하길 원하지 않는다면, 다시 시작하기 전에 celerybeat-schedule 파일을 지우고 다시 시작하자.



See Also




Reference

  1. How to install Celery on Django and Create a Periodic Task

댓글 1개:

  1. 덕분에 환경설정 잘 했습니다. 감사합니다.

    답글삭제