Динамическая маршрутизации задач в Celery
В нашем предыдущем посте в блоге мы рассмотрели пользовательские очереди и маршрутизацию задач. Для каждой задачи нам приходилось настраивать, к какой очереди мы хотим, чтобы Celery поставил маршрут. Хотя этот подход хорошо работает для простой настройки, он не подходит для приложений и микросервисов, где многие задачи Celery необходимо направлять в различные рабочие очереди.
Шаг 1: Конфигурация Celery task_routes
Вместо того, чтобы конфигурировать task_routes для задачи, в какую очередь вы хотите направить задачу, вы можете указать Celery использовать собственный класс, указав путь к указанному классу:
app = Celery(__name__) app.conf.update({ 'broker_url': os.environ['CELERY_BROKER_URL'], 'imports': ( 'tasks', ), 'task_routes': ('task_router.TaskRouter',), 'task_serializer': 'json', 'result_serializer': 'json', 'accept_content': ['json']})
Шаг 2: Определите класс TaskRouter
В соответствии с нашим значением task_routes выше, нам нужно определить пользовательский класс TaskRouterclass в модуле task_router.py. Celery ожидает метод route_for_task, который передает имя задачи в качестве первого аргумента. Обратите внимание, как метод возвращает dict, который выглядит точно так же, как тот, который используется для ручной маршрутизации задач.
class TaskRouter: def route_for_task(self, task, *args, **kwargs): if ':' not in task: return {'queue': 'default'} namespace, _ = task.split(':') return {'queue': namespace}
Наша идея состоит в том, чтобы выполнить задачу по ее имени, в частности, мы предпологаем, что имена наших задач следуют шаблону queue: taskname. В нашем предыдущем посте у нас была задача fetch_bitcoin_price_index, которую мы хотели перенаправить в очередь, называемую feeds. Мы переименовываем эту задачу в фиды: fetch_bitcoin_price_index.
@app.task(bind=True, name='feeds:fetch_bitcoin_price_index') def fetch_bitcoin_price_index(self, start_date, end_date): ... @app.task(bind=True, name='filters:calculate_moving_average') def calculate_moving_average(self, args, window): ...
Нам нужно запустить два воркера из Celery. Один подписывается на каналы, другой на очередь фильтров:
celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters
Обратите внимание на аргументы командной строки --queues. Ваши воркеры подписываются на определенные очереди. Для подписки более чем на одну очередь используйте запятую для разделения списка, например, так --queues = feeds, filters. Для получения дополнительной информации ознакомьтесь с документами по Celery.
Шаг 3: Готов к действию
Создайте стек docker-compose, и запустите скрипт example.py:
docker-compose up -d
Сценарий вызывает цепочку Celery, состоящую из двух задач: fetch_bitcoin_price_index извлекает данные индекса цен Bicoin из API Coindesk через очередь каналов для воркеров Celery. Когда задача успешно завершается, результат передается на calc_moving_average через очередь фильтров на worker-filters Celery.
Проверьте журналы docker-compose, чтобы следить за потоком задач через двух воркеров:
Стек docker-compose.yml также поставляется с экземпляром flower. Flower инструмент для мониторинга работы сельдерея и задач. Проверьте ваш браузер на http://localhost:5555.
В этом посте вы узнали, как настроить Celery для маршрутизации задач с помощью настраиваемого маршрутизатора задач. Это решение хорошо масштабируется при использовании множества задач в разных очередях и воркерах.