DevGang
Авторизоваться

Маршрутизация задача в Celery

По умолчанию Celery направляет все задачи в одну очередь, и все обработчики, по умолчанию, используют эту очередь. С очередями Celery вы можете контролировать, какие задачи выполняются обработчиками Celery. Это может быть полезно, если у вас есть разные по скорости выполнения задачи, и вы хотите, чтобы медленные задачи не мешали выполнению быстрым. Или если вам нужно отправить какуюто задачу из одного микросервиса в другой.

Шаг 1: Настройте Celery через task_routes

Celery может быть настроен для каждой задачи в отдельности:

app = Celery(__name__)
app.conf.update({
    'broker_url': os.environ['CELERY_BROKER_URL'],
    'imports': (
        'tasks',
    ),
    'task_routes': {
        'fetch_bitcoin_price_index': {'queue': 'feeds'}
        'calculate_moving_average': {'queue': 'filters'}
    },
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})

Шаг 2: Заставить worker подписаться на очередь

Мы запускаем один worker Celery, который подписывается на очередь каналов и обрабатывает задачи fetch_bitcoin_price_index.

celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds

Другой worker подписывается на очередь фильтров и обрабатывает задачи calc_moving_average:

celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters

Обратите внимание на аргументы --queues в командной строке. Ваши воркеры подписываются на определенные очереди. Для подписки более чем на одну очередь используйте запятую для разделения списока, например так --queues=feeds,filters. Для получения дополнительной информации ознакомьтесь с документацией Celery.

Шаг 3: Пробуем

Создайте стек docker-compose:

docker-compose up -d

И запустите скрипт example.py:

docker-compose exec worker-feeds python example.py --start_date=2018-01-01 --end_date=2018-05-29 --window=3

Здесь мы повторно используем цепочку задач Celery из нашего предыдущего поста. Внутри example.py мы вызываем цепочку из двух задач: fetch_bitcoin_price_index извлекает данные индекса цен Bicoin из API Coindesk через очередь каналов для воркера Celery каналов.

Когда задача успешно завершается, результат передается на calc_moving_average через очередь фильтров на рабочий-фильтры Celery worker.

import argparse
from celery import chain
from tasks import fetch_bitcoin_price_index, calculate_moving_average
        
parser = argparse.ArgumentParser()
parser.add_argument('--start_date')
parser.add_argument('--end_date')
parser.add_argument('--window', default=3)
args = parser.parse_args()
task = chain(
    fetch_bitcoin_price_index.s(
        start_date=args.start_date,
        end_date=args.end_date),
    calculate_moving_average.s(window=args.window)
).delay()

Проверьте журналы docker-compose, чтобы следить за потоком задач через двух рабочих:

docker-compose logs -f

Стек docker-compose.yml также поставляется с экземпляром flower. Flower это инструмент для мониторинга работы Celery и его задач. Проверьте результат в браузере перейдя по ссылке http: //localhost:5555.

Подведем итоги

В этой статье вы узнали, как настроить Celery для маршрутизации задач в выделенные очереди и как заставить Celery подписываться на определенные очереди.

Для этого вам нужно определить маршруты для каждой задачи. Этот подход хорошо работает для простой настройки. Однако он не очень хорошо масштабируется для приложения со многими задачами Celery или для среды micro-services/Docker, где несколько служб взаимодействуют через одного и того же брокера сообщений.

В следующей статье вы узнаете все о динамической маршрутизации задач, которая является программным и масштабируемым решением для преодоления ограничений, описанных выше.

#Flask #Python #Celery
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу