Маршрутизация задача в Celery
По умолчанию Celery направляет все задачи в одну очередь, и все обработчики, по умолчанию, используют эту очередь. С очередями Celery вы можете контролировать, какие задачи выполняются обработчиками Celery. Это может быть полезно, если у вас есть разные по скорости выполнения задачи, и вы хотите, чтобы медленные задачи не мешали выполнению быстрым. Или если вам нужно отправить какуюто задачу из одного микросервиса в другой.
Шаг 1: Настройте Celery через task_routes
Celery может быть настроен для каждой задачи в отдельности:
app = Celery(__name__) app.conf.update({ 'broker_url': os.environ['CELERY_BROKER_URL'], 'imports': ( 'tasks', ), 'task_routes': { 'fetch_bitcoin_price_index': {'queue': 'feeds'} 'calculate_moving_average': {'queue': 'filters'} }, 'task_serializer': 'json', 'result_serializer': 'json', 'accept_content': ['json']})
Шаг 2: Заставить worker подписаться на очередь
Мы запускаем один worker Celery, который подписывается на очередь каналов и обрабатывает задачи fetch_bitcoin_price_index.
celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds
Другой worker подписывается на очередь фильтров и обрабатывает задачи calc_moving_average:
celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters
Обратите внимание на аргументы --queues в командной строке. Ваши воркеры подписываются на определенные очереди. Для подписки более чем на одну очередь используйте запятую для разделения списока, например так --queues=feeds,filters. Для получения дополнительной информации ознакомьтесь с документацией Celery.
Шаг 3: Пробуем
Создайте стек docker-compose:
docker-compose up -d
И запустите скрипт example.py:
docker-compose exec worker-feeds python example.py --start_date=2018-01-01 --end_date=2018-05-29 --window=3
Здесь мы повторно используем цепочку задач Celery из нашего предыдущего поста. Внутри example.py мы вызываем цепочку из двух задач: fetch_bitcoin_price_index извлекает данные индекса цен Bicoin из API Coindesk через очередь каналов для воркера Celery каналов.
Когда задача успешно завершается, результат передается на calc_moving_average через очередь фильтров на рабочий-фильтры Celery worker.
import argparse from celery import chain from tasks import fetch_bitcoin_price_index, calculate_moving_average parser = argparse.ArgumentParser() parser.add_argument('--start_date') parser.add_argument('--end_date') parser.add_argument('--window', default=3) args = parser.parse_args() task = chain( fetch_bitcoin_price_index.s( start_date=args.start_date, end_date=args.end_date), calculate_moving_average.s(window=args.window) ).delay()
Проверьте журналы docker-compose, чтобы следить за потоком задач через двух рабочих:
docker-compose logs -f
Стек docker-compose.yml также поставляется с экземпляром flower. Flower это инструмент для мониторинга работы Celery и его задач. Проверьте результат в браузере перейдя по ссылке http: //localhost:5555.
Подведем итоги
В этой статье вы узнали, как настроить Celery для маршрутизации задач в выделенные очереди и как заставить Celery подписываться на определенные очереди.
Для этого вам нужно определить маршруты для каждой задачи. Этот подход хорошо работает для простой настройки. Однако он не очень хорошо масштабируется для приложения со многими задачами Celery или для среды micro-services/Docker, где несколько служб взаимодействуют через одного и того же брокера сообщений.
В следующей статье вы узнаете все о динамической маршрутизации задач, которая является программным и масштабируемым решением для преодоления ограничений, описанных выше.