celery beat 에서 schedule task 를 dynamically add/remove
git hub issue comment 에서 dynamic 하게 event 를 추가할 때 celery beat 가 새롭게 populate_heap() 을 하도록 하는 법이 나와 있다. 여기서 이야기하는 바는 아래 2가지이다.
- populate_heap() 을 schedule 이 변경될 때마다 해주는 것
- 그리고 DatabaseSchduler 를 사용해서 add/remove 를 하는 것(참고: Using custom scheduler classes / Celery 4.1.0 documentation)
이중에 1번째 내용은 이슈의 comment에 의하면 4.1 버전에 반영되었다고 한다.
실제로 확인을 해보니 반영이 되어 있다. Populate heap when periodic tasks are changed · celery/celery@6a24e00 · GitHub
Scheduler
celery.beat.PersistentScheduler
기본적으로 celery.beat.PersistentScheduler를 사용한다.$celery -A proj beat -s /home/celery/var/run/celerybeat-schedule
celery.beat.PersistentScheduler 는 celerybeat-schedule file을 이용하는데 shelve database 파일이다. 아래 shelve database 의 '제한사항(retrictions)' 에도 나오지만 concurrent access 를 지원하지 않는다.
django-celery-beat
django-celery-beat 은 database 에 "periodic task schedule" 을 저장할 수 있도록 해준다. 하지만 이것은 Django 를 필요로 한다.Django 없이 SQLAlchemy 를 이용하는 DatabaseScheduler
사용법은 간단한다. SQLAlchemy 를 설치하고
$pip install sqlalchemy
아래 code 를 실행해서 DB table 을 생성한다.(참고로, 아래코드는 sqlite 을 사용하는 코드다)(다른 DB urls 과 관련해서는 여기 를 참고하자.)
참고로 추후에 sqlalchemy_scheduler_models.py 의 engine 도 user/password 를 해줘야 한다. 예를 들면 아래와 같은 식으로 말이다.
참고로 추후에 sqlalchemy_scheduler_models.py 의 engine 도 user/password 를 해줘야 한다. 예를 들면 아래와 같은 식으로 말이다.
- engine = sqlalchemy.create_engine('mysql://root:root_password@localhost/db_name')
## sqlalchemy_scheduler_models.py
...
engine = sqlalchemy.create_engine('sqlite://')
Base = declarative_base(bind=engine)
...
from sqlalchemy import create_engine
from sqlalchemy_scheduler_models import Base
# Create an engine that stores data in the local directory's
# sqlalchemy_example.db file.
engine = create_engine('sqlite:///sqlalchemy_example2.db')
# Create all tables in the engine. This is equivalent to "Create Table"
# statements in raw SQL.
Base.metadata.create_all(engine)
그리고 celconfig.py 에 아래를 추가하자.
CELERYBEAT_SCHEDULER = 'yourproject.sqlalchemy_scheduler:DatabaseScheduler'
주의
주의할 점은 scheduler 가 바뀌는 것에 대해서는 새롭게 sync 를 하지만 celery_crontabs 내용이 바뀌는 것에 대해서는 반영되지 않는다. 이것이 왜 중요하냐면, scheduler 에서 시간 설정을 crontab_id 로 하기 때문이다.즉, crontab 의 내용을 변경하는 경우(시간을 변경하는 것)라면 새롭게 celery beat 를 restart 해야 한다.
etc
수행조건
enabled, last_run_at
table 'celery_schedules' 에 있는 task 들은 해당되는 crontab_id 시간에 수행된다. 이때 "enabled" 가 '1' 이 되어 있어야 하고, 또 하나 "last_run_at" 이 현재 UTC 보다 이전이어야 한다.참고로 last_run_at 이라고 가정하고, 동작한다. 그러기 때문에 insert 할 때도 UTC 를 insert 해야 한다. 그렇지 않으면 last_run_at 이 현재 UTC 보다 큰 값이 돼서 동작이 실행되지 않을 수 있다.
sync 시점
또는 date_changed 를 보고 이전의 date_changed 시간이후여야 한다. 그래야 task 를 resync 하게 된다.(참고 : sqlalchemy_scheduler.py: DatabaseScheduler.should_sync())should_sync 가 발생빈도는 Scheduler 의 sync_every 변수를 수정하면 된다.
# beat.py class Scheduler(object): ... #: How often to sync the schedule (3 minutes by default) sync_every = 3 * 60
그리고 또하나 주의할 점은 session 과 관련돼서, session 이 기본적으로 autoflush 를 하지 않기 때문에, 새롭게 update 된 내용이 session 을 통해 이루어지지 않았다면, DatabaseScheduler 가 새롭게 sync 하지 못한다. (예를 들면, DB client 에서 직접 수정한 경우등)
그렇기 때문에 되도록 schedule 을 추가하는 부분도 동일한 session 을 이용하도록 하는 것이 낫다.
See Also
- Don't keep important data in your Celery queue : celery 에 apply_async 를 사용하는 것보다는 주기적으로 task 를 확인하고 실행하도록 하는 것이 낫다는 내용의 글. expires option 에 대한 이야기도 있다.
- Celery 4.4.5- task_time_limit task_time_limit 은 default 가 No time limit 이다. 하지만, 주기적으로 task 를 돌릴 때 설정한 주기와 task 를 끝내는데 걸리는 시간은 고려해야 한다.
댓글 없음:
댓글 쓰기