[컴][웹][파이썬] celery beat 에서 schedule task 를 dynamically 추가/삭제

셀러리 비트 / celery beat how to add/remove scheduled task dynamically / 원하는 순간에 추가/삭제 / 다이내믹하게  /


celery beat 에서 schedule task 를 dynamically add/remove


git hub issue comment 에서 dynamic 하게 event 를 추가할 때 celery beat 가 새롭게 populate_heap() 을 하도록 하는 법이 나와 있다. 여기서 이야기하는 바는 아래 2가지이다.
  1. populate_heap() 을 schedule 이 변경될 때마다 해주는 것
  2. 그리고 DatabaseSchduler 를 사용해서 add/remove 를 하는 것(참고: Using custom scheduler classes / Celery 4.1.0 documentation)

이중에 1번째 내용은 이슈의 comment에 의하면 4.1 버전에 반영되었다고 한다.

실제로 확인을 해보니 반영이 되어 있다. Populate heap when periodic tasks are changed · celery/celery@6a24e00 · GitHub


Scheduler

celery.beat.PersistentScheduler

기본적으로 celery.beat.PersistentScheduler를 사용한다.
$celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

celery.beat.PersistentScheduler 는 celerybeat-schedule file을 이용하는데 shelve database 파일이다. 아래 shelve database 의 '제한사항(retrictions)' 에도 나오지만 concurrent access 를 지원하지 않는다.

django-celery-beat

django-celery-beat 은 database 에 "periodic task schedule" 을 저장할 수 있도록 해준다. 하지만 이것은 Django 를 필요로 한다.

Django 없이 SQLAlchemy 를 이용하는 DatabaseScheduler

사용법은 간단한다.  SQLAlchemy 를 설치하고
$pip install sqlalchemy

아래 code 를 실행해서 DB table 을 생성한다.(참고로, 아래코드는 sqlite 을 사용하는 코드다)(다른 DB  urls 과 관련해서는 여기 를 참고하자.)

참고로 추후에 sqlalchemy_scheduler_models.py 의 engine  도 user/password 를 해줘야 한다. 예를 들면 아래와 같은 식으로 말이다.
  • engine = sqlalchemy.create_engine('mysql://root:root_password@localhost/db_name')




## sqlalchemy_scheduler_models.py
...
engine = sqlalchemy.create_engine('sqlite://')
Base = declarative_base(bind=engine)
...

from sqlalchemy import create_engine
from sqlalchemy_scheduler_models import Base


# Create an engine that stores data in the local directory's
# sqlalchemy_example.db file.
engine = create_engine('sqlite:///sqlalchemy_example2.db')
 
# Create all tables in the engine. This is equivalent to "Create Table"
# statements in raw SQL.
Base.metadata.create_all(engine)

그리고 celconfig.py 에 아래를 추가하자.
CELERYBEAT_SCHEDULER = 'yourproject.sqlalchemy_scheduler:DatabaseScheduler'


주의

주의할 점은 scheduler 가 바뀌는 것에 대해서는 새롭게 sync 를 하지만 celery_crontabs 내용이 바뀌는 것에 대해서는 반영되지 않는다. 이것이 왜 중요하냐면, scheduler 에서 시간 설정을 crontab_id 로 하기 때문이다.

즉, crontab 의 내용을 변경하는 경우(시간을 변경하는 것)라면 새롭게 celery beat 를 restart 해야 한다.

etc

수행조건

enabled, last_run_at

table 'celery_schedules' 에 있는 task 들은 해당되는 crontab_id 시간에 수행된다. 이때 "enabled" 가 '1' 이 되어 있어야 하고, 또 하나 "last_run_at" 이 현재 UTC 보다 이전이어야 한다.

참고로 last_run_at 이라고 가정하고, 동작한다. 그러기 때문에 insert 할 때도 UTC 를 insert 해야 한다. 그렇지 않으면 last_run_at 이 현재 UTC 보다 큰 값이 돼서 동작이 실행되지 않을 수 있다.

sync 시점

또는 date_changed 를 보고 이전의 date_changed 시간이후여야 한다. 그래야 task 를 resync 하게 된다.(참고 : sqlalchemy_scheduler.py: DatabaseScheduler.should_sync())

should_sync 가 발생빈도는 Scheduler 의 sync_every 변수를 수정하면 된다.
# beat.py
class Scheduler(object):
     ...
    #: How often to sync the schedule (3 minutes by default)
    sync_every = 3 * 60

그리고 또하나 주의할 점은 session 과 관련돼서, session 이 기본적으로 autoflush 를 하지 않기 때문에, 새롭게 update 된 내용이 session 을 통해 이루어지지 않았다면, DatabaseScheduler 가 새롭게 sync 하지 못한다. (예를 들면, DB client 에서 직접 수정한 경우등)

그렇기 때문에 되도록 schedule 을 추가하는 부분도 동일한 session 을 이용하도록 하는 것이 낫다.


See Also

  1. Don't keep important data in your Celery queue : celery 에 apply_async 를 사용하는 것보다는 주기적으로 task 를 확인하고 실행하도록 하는 것이 낫다는 내용의 글. expires option 에 대한 이야기도 있다.
  2. Celery 4.4.5- task_time_limit task_time_limit 은 default 가 No time limit 이다. 하지만, 주기적으로 task 를 돌릴 때 설정한 주기와 task 를 끝내는데 걸리는 시간은 고려해야 한다.


댓글 없음:

댓글 쓰기