[컴] Celery 동작과 사용법

셀러리 / Celery 사용법


Celery

Celery 는 분산메시지전달(distributed message passing) 을 기반으로 한 비동기 작업 큐(asynchronous task queue) 이다.[ref. 1]

이 녀석은 하나의 일만 하는 서버로 보면 된다. 이렇게 하나의 일을 하는 thread 나 process 를 worker(일꾼) 라고 부르는데 이런 worker 의 하나라고 보면 된다.

이 일꾼은 일꾼으로서 완전한 동작을 하기 위해 아래 2가지 동작이 가능하다.
  1. 일을 전달 받는 법
  2. 어떤 일을 할 것인지 정의

일을 전달 받는 법

이 celery 가 일을 전달받는 방법은 queue 를 바라보는 것이다. 다만 queue 는 celery 가 가지고 있지 않다. RebbitMQ 등의 queue 를 따로 실행시켜서 사용해야 한다. Celery 에서는 이 queue 를 Broker 라고 부른다. 우리도 일을 중간에서 연결해주는 녀석을 broker(브로커) 라고 부르니 대충 의미를 짐작할 수 있을 것이다.


그러면 이 broker 를 Celery 에게 알려주긴 해야 한다. 그래서 Celery 를 시작할 때 parameter 로 알려준다. 아니면 config file 에서 BROKER_URL 을 설정할 수도 있다. 참고로 default 값이 'amqp://' 이다.(참고)
app = Celery('tasks', broker='amqp://guest@localhost//')

어떤 일을 할 것인가.

이것은 Celery 를 이용한 coding 을 해주면 된다. RebbitMQ 를 먼저 설치하고 실행하자.(설치하면 기본적으로 실행된다.)

pip install Celery

#test.py
from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y


c:..> python
>>> from test import add
>>> add.delay(3,4)
<AsyncResult: 716c4b58-e17b-4cad-8378-842c259c15cd>
>>> r = add.delay(3,4)
>>> r
<AsyncResult: 131dff4a-e54e-43cd-a415-00cf69a006f5>

return value

Celery 로 수행한 일에 대한 return value 을 하고 싶으면, 이녀석의 return 결과를 어딘가에 잠깐 저장해 놔야 한다. 왜냐하면 이녀석은 asynchronous 하게 일을 처리하기 때문에 계속 자신의 return value 를 가지고 있지 않는다.

config file 

config 관련 자세한 설명은 여기에서 확인할 수 있다.

여기서는 config 사용법을 볼 것이다. 일단 아래처럼 celeryconfig.py 를 만들자.

# celeryconfig.py
## Broker settings.
BROKER_URL = 'amqp://guest:guest@localhost:5672//'

# List of modules to import when celery starts.
CELERY_IMPORTS = ('myapp.tasks', )

## Using the database to store task state and results.
CELERY_RESULT_BACKEND = 'db+sqlite:///results.db'

CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}

이것을 Celery instance 에 적용하기 위해서는 config_from_object() 를 이용하면 된다.
from celery import Celery
 
celery = Celery('tasks')
celery.config_from_object('celeryconfig')

command line

command line 에서 실행할 때는 --config option 을 사용하면 된다.
>celery -A pworker worker -l info --config=celconfig.py

dedicated module

celery 부분을 하나의 module 로 작성할 수 있다. 여기서 자세한 사항을 확인할 수 있다. 아래 github 에 example 을 올려놨다.


주의할 점

celery.py 가 있어야만 한다. 기본적으로 command line 에서 celery 를 실행하면 module 의 celery 를 import 하게 된다.

Scheduling[ref. 5]

주기적인 작업을 위해 celery beat 를 제공한다. celery beat 이 scheduler 이다.
CELERYBEAT_SCHEDULE 를 set 해서 주기를 설정할 수 있다. 이 설정을 DB 에 놓을 수도 있다고 한다.

celery beat 은 한개만 띄우라고 한다. scheduler 가 여러개라면 동기화의 문제등이 생기게 된다.

그리고 혹시 착각할 수 있어서 언급하면, celery beat 을 띄우고, celery worker 도 따로 띄워줘야 한다. 즉, celery beat 는 주기적으로 message 를 날리는 역할만 한다.

time zone

기본적으로 UTC time zone 을 사용한다. 이것은  CELERY_TIMEZONE 으로 설정을 변경할 수 있다. Django 를 사용하는 경우라면, Django 의 TIME_ZONE 설정이 사용된다. Celery 에 다른 time zone 을 적용하고 싶으면 CELERY_TIMEZONE 을 설정하면 된다.

실행

command line 에서 실행할 때는 option 에 worker 대신에 beat 를 주면 된다.[ref. 5]
>celery -A pworker beat -l info --config=celconfig.py
(vir_mickey) c:\...\src>celery -A pworker beat -l info --config=celconfig.py
celery beat v3.1.23 (Cipater) is starting.
__    -    ... __   -        _
Configuration ->
    . broker -> amqp://guest:**@localhost:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> now (0s)
[2016-07-21 16:26:39,206: INFO/MainProcess] beat: Starting...
[2016-07-21 16:27:09,755: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add)
[2016-07-21 16:27:45,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add)
[2016-07-21 16:28:15,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add)
[2016-07-21 16:28:45,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add)
[2016-07-21 16:29:15,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add)
[2016-07-21 16:29:45,121: INFO/MainProcess] Scheduler: Sending due task add-every-30-seconds (tasks.add)

schedule 저장

celery beat 는 기본적으로 celery.beat.PersistentScheduler 를 기본 scheduler 로 사용한다. 이 PersistentScheduler 가 마지막 실행시간들을(last run times) local database file에 기록한다.(default name 이 celerybeat-schedule 이다. shelve 를 참고하자.)

그래서 현재 directory 에 대한 write 권한이 있어야 한다. 아니면 다른 곳을 지정해 줘야 한다. 이것은 -s option 으로 지정할 수 있다.
..>celery -A pworker beat -l info --config=celconfig.py -s .\mydb-scedule

djcelery.schedulers.DatabaseScheduler

django 에서 celery 를 사용하도록 해주는 django-celery 에서는 schedule 을 Django database 에 저장할 수 있도록 해주는 djcelery.schedulers.DatabaseScheduler 를 제공한다. 이 녀석을 사용하면 Django Admin 에서 periodic task 를 추가, 삭제, 수정을 할 수 있다.

다른 scheduler 는 "-S option" 을 사용하거나 config file 에서 CELERYBEAT_SCHEDULER 를 이용해서 사용할 수 있다.
..>celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

결과값 저장

결과값을 저장하는 방법은 대체로 3개정도이다.(memcache, db, message queue)
아래글에서 DB를 사용할때의 장,단점, message queue 를 사용할 때의 장,단점등을 알려준다.

logging

celery worker 에 log 를 찍고 싶은 경우 아래를 참고하자.

windows 에서 test

기본적으로 4.0 부터는 windows 에 대한 지원을 하지 않는다. (참고 ) 그러므로 3.1.24 를 이용해야 한다.

DB 사용 task

  1. python - how to setup sqlalchemy session in celery tasks with no global variable - Stack Overflow
  2. prschmid: Using SQLAlchemy with Celery Tasks

# proj.db.py

from sqlalchemy import create_engine

engine = create_engine(
    'sqlite:///:memory:', convert_unicode=True,
    pool_recycle=3600, pool_size=10)

from proj.db import engine

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker


class DBTask(Task):
    _session = None

    def after_return(self, *args, **kwargs):
        if self._session is not None:
            self._session.remove()

    @property
    def session(self):
        if self._session is None:
            _, self._session = scoped_session(
                sessionmaker(autocommit=False, autoflush=False, bind=engine))

        return self._session

...

@app.task(base=DBTask, bind=True)
def do_stuff_with_db(self, conf, some_arg):
    self.conf = conf
    thing = self.session.query(Thing).filter_by(arg=some_arg).first()




Django + Celery


ref. 6의 내용을 가지고 정리 해 본다. 여기서 django project 의 이름은 proj 이다.
  1. pip install django-celery
  2. proj/proj/celery.py 만들기
  3. proj/proj/__init__.py 에 import 추가하기
  4. settings.py 에 INSTALLED_APPS 에 'djcelery' 추가하기.
  5. djcelery 에 대한 db migrate
  6. 실행
    1. RabbitMQ 실행
    2. Celery worker 실행
    3. Celery beat 실행
    4. Django server 실행


pip install django-celery

이녀석은 굳이 설치하지 않아도 된다.(참고

django-celery 를 설치하면, schedule 을 django db 에 저장해서 사용할 수 있고, backend 를 django db 로 사용할 수 있다. 이런 목적이 없다면 굳이 설치하지 않아도 된다.

..> pip install django-celery


proj/proj/celery.py 만들기

아래와 같이 celery.py 를 만들자.

proj/proj/celery.py
from __future__ import absolute_import

import os

# 이녀석이 없으면 celery.py 와 library 의 celery.py 를 혼동한다.
from __future__ import absolute_import

# set the default Django settings module for the 'celery' program.
# 이 부분으로 celery command line program 이 Django project 의 위치를 알게 된다.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

from django.conf import settings  # noqa

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
# 이 부분으로 celery setting 을 해준다.
app.config_from_object('django.conf:settings')
# 원래 CELERY_IMPORT 에 설정해 주는데, autodiscover_tasks 를 사용하면 
# 각 django app 에 tasks.py 로 celery task 들을 만들어 놓으면 알아서 찾는다.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

# bind=True 를 이용하면, current instance (=self) 를 parameter 로 받을 수 있다.
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))


proj/proj/__init__.py 에 import 추가

proj/proj/__init__.py
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app  # noqa



settings.py 에 INSTALLED_APPS 에 'djcelery' 추가

INSTALLED_APPS(
    ...
    'djcelery',
    ...
)


djcelery 에 대한 db migrate

> python manage.py migrate djcelery

그러면 아래 같은 table 이 생긴다.

위의 db 를 사용한 backend 와 *schedule table 들을 이용하려면 아래처럼 설정해 주면 된다. 아래 설정은 proj/proj/settings.py 에 넣어주면 된다.
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'


RabbitMQ 실행

windows 라면 service 에서 실행하면 된다. linux 라면 아래와 같이 하자.
> sudo rabbitmq-server -detached


Celery worker 실행

> python manage.py celeryd --verbosity=2 --loglevel=DEBUG



Celery beat 실행

새로운 cmd 창을 하나 띄우고, 아래 처럼 celery beat 을 실행하자.

> python manage.py celerybeat --verbosity=2 --loglevel=DEBUG

참고로 아래와 같이  띄울 수도 있다.
...proj>celery -A proj beat -l info




See Also

  1. celery beat 에서 schedule task 를 dynamically 추가/삭제
  2. [컴] celery 설치 및 실행 예제 / django + celery

Reference

  1. First Steps with Celery — Celery 3.1.23 documentation
  2. Spoqa Tech Blog | Celery를 이용한 긴 작업 처리
  3. Celery를 이용한 분산처리 프로세스 작성하기 — Sunhyoup’s Story — Medium
  4. Celery Periodic Tasks: From Installation to Infinity | Metal Toad
  5. Periodic Tasks — Celery 3.1.23 documentation
  6. First steps with Django — Celery 3.1.23 documentation
  7. How to install Celery on Django and Create a Periodic Task
  8. Django - Celery - DB 사용 예제

댓글 없음:

댓글 쓰기