DevGang
Авторизоваться

Динамическая маршрутизации задач в Celery

В нашем предыдущем посте в блоге мы рассмотрели пользовательские очереди и маршрутизацию задач. Для каждой задачи нам приходилось настраивать, к какой очереди мы хотим, чтобы Celery поставил маршрут. Хотя этот подход хорошо работает для простой настройки, он не подходит для приложений и микросервисов, где многие задачи Celery необходимо направлять в различные рабочие очереди.

Шаг 1: Конфигурация Celery task_routes

Вместо того, чтобы конфигурировать task_routes для задачи, в какую очередь вы хотите направить задачу, вы можете указать Celery использовать собственный класс, указав путь к указанному классу:

app = Celery(__name__)
app.conf.update({
    'broker_url': os.environ['CELERY_BROKER_URL'],
    'imports': (
        'tasks',
    ),
    'task_routes': ('task_router.TaskRouter',),
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})

Шаг 2: Определите класс TaskRouter

В соответствии с нашим значением task_routes выше, нам нужно определить пользовательский класс TaskRouterclass в модуле task_router.py. Celery ожидает метод route_for_task, который передает имя задачи в качестве первого аргумента. Обратите внимание, как метод возвращает dict, который выглядит точно так же, как тот, который используется для ручной маршрутизации задач.

class TaskRouter:
    def route_for_task(self, task, *args, **kwargs):
        if ':' not in task:
            return {'queue': 'default'}

        namespace, _ = task.split(':')
        return {'queue': namespace}

Наша идея состоит в том, чтобы выполнить задачу по ее имени, в частности, мы предпологаем, что имена наших задач следуют шаблону queue: taskname. В нашем предыдущем посте у нас была задача fetch_bitcoin_price_index, которую мы хотели перенаправить в очередь, называемую feeds. Мы переименовываем эту задачу в фиды: fetch_bitcoin_price_index.

@app.task(bind=True, name='feeds:fetch_bitcoin_price_index')
def fetch_bitcoin_price_index(self, start_date, end_date):
   ...


@app.task(bind=True, name='filters:calculate_moving_average')
def calculate_moving_average(self, args, window):
    ...

Нам нужно запустить два воркера из Celery. Один подписывается на каналы, другой на очередь фильтров:

celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds
celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters

Обратите внимание на аргументы командной строки --queues. Ваши воркеры подписываются на определенные очереди. Для подписки более чем на одну очередь используйте запятую для разделения списка, например, так --queues = feeds, filters. Для получения дополнительной информации ознакомьтесь с документами по Celery.

Шаг 3: Готов к действию

Создайте стек docker-compose, и запустите скрипт example.py:

docker-compose up -d

Сценарий вызывает цепочку Celery, состоящую из двух задач: fetch_bitcoin_price_index извлекает данные индекса цен Bicoin из API Coindesk через очередь каналов для воркеров Celery. Когда задача успешно завершается, результат передается на calc_moving_average через очередь фильтров на worker-filters Celery.

Проверьте журналы docker-compose, чтобы следить за потоком задач через двух воркеров:

Стек docker-compose.yml также поставляется с экземпляром flower. Flower инструмент для мониторинга работы сельдерея и задач. Проверьте ваш браузер на http://localhost:5555.

В этом посте вы узнали, как настроить Celery для маршрутизации задач с помощью настраиваемого маршрутизатора задач. Это решение хорошо масштабируется при использовании множества задач в разных очередях и воркерах.

#Flask #Python #Celery
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться