Тестирование цепочек Celery
Цепочки Celery позволяют вам модульно оформить ваше приложение и повторно использовать общие задачи. Классическим примером использования является система рыночных данных.
Цель состоит в том, чтобы использовать рыночные данные от разных поставщиков данных, таких как Bloomberg или Reuters. API-интерфейсы у разных сервисов, как полагается могут быть разных видов и форм, но в конечном итоге данные должны быть преобразованы к одному формату и сохранены в таблице базы данных.
При объединении в цепочку задач Celery нам нужно только создать небольшое количество специализированных задач подачи и преобразования данных для каждого поставщика, но мы можем повторно использовать общие задачи, такие как десериализация и запись данных.
Звучит вероятно сложно, но на деле все гораздо проще чем кажется.
Пример
Допустим, мы хотим загрузить данные временных рядов криптовалюты из ряда различных API и вычислить средние значение для каждого из этих временных рядов.
Например, одним из таких временных рядов будет индекс цен на биткойны, доступный через Coindesk API.
После тщательного рассмотрения возможности повторного использования и разделения проблем мы решили реализовать общую задачу Celery для вычисления среднего значения, ожидая в качестве параметров список объектов - [{"date": "2018-05-01": "value": 1000.0}, {"date": "2018-05-02": "value": 1003.5}, ...] - и количество дней, в течение которых мы хотим рассчитать среднее значение:
import pandas as pd import numpy as np from celery import chain from worker import app @app.task(bind=True, name='calculate_moving_average') def calculate_moving_average(self, timeseries, window): df = pd.DataFrame(timeseries) df['ma'] = df['value'].rolling(window=window, center=False).mean() return list(df.replace(np.nan, '', regex=True).T.to_dict().values())
Теперь мы можем реализовать любую задачу Celery для любого заданного API; до тех пор, пока эта задача возвращает список объектов ожидаемого формата [{"date": "2018-05-01": "value": 1000.0}, {"date": "2018-05-02": "value ": 1003.5}, ...], мы можем просто упорядочить эти задачи, чтобы иметь возможность вычислить среднее значение для любого результата API. Например, задача Celery для фида Coindesk Bitcoin Price Index выглядит так:
import requests from worker import app @app.task(bind=True, name='fetch_bitcoin_price_index') def fetch_bitcoin_price_index(self, start_date, end_date): url = f'https://api.coindesk.com/v1/bpi/historical/close.json?start={start_date}&end={end_date}' response = requests.get(url) if not response.ok: raise ValueError(f'Unexpected status code: {response.status_code}') return [{'date': key, 'value': value} for key, value in response.json()['bpi'].items()]
Как мы сделаем тестирование цепочек Celery? Как обычно, есть больше, чем один ответ. Давайте посмотрим на две разные стратегии и обсудим, какая из них имеет смысл в каком контексте.
Делаем “мок” цепочек Celery
Нам нужно проверить, где бы мы ни вызывали цепочку Celery, отдельные задачи вызываются в правильном порядке с правильными аргументами. Что бы ни происходило внутри задачи, мы не заботимся об этом, поскольку это уже охвачено модульным тестом.
from flask import Flask, Response, request, jsonify from tasks import fetch_bitcoin_price_index, calculate_moving_average from celery import chain app = Flask(__name__) @app.route('/', methods=['POST']) def index(): chain( fetch_bitcoin_price_index.s( start_date=request.json['start_date'], end_date=request.json['end_date']), calculate_moving_average.s(window=request.json['window']) ).delay() return '', 201
Цепочка вызывается в этом представлении Flask. Следовательно, это то, для чего нам нужно написать тест (еще один распространенный подход - реализовать специальный метод, который вызывает цепочку Celery и писать тест для этого метода).
import app from unittest import TestCase, mock class Tests(TestCase): def setUp(self): app.app.config['TESTING'] = True self.client = app.app.test_client() @mock.patch('app.chain') @mock.patch('app.fetch_bitcoin_price_index') @mock.patch('app.calculate_moving_average') def test_mocked_chain(self, mock_calculate_moving_average, mock_fetch_bitcoin_price_index, mock_chain): response = self.client.post('/', json={'start_date': '2018-05-01', 'end_date': '2018-05-08', 'window': 3}) self.assertEqual(response.status_code, 201) mock_chain.assert_called_once_with( mock_fetch_bitcoin_price_index.s(start_date='2018-05-01', end_date='2018-05-08'), mock_calculate_moving_average.s(window=3))
Синхронное тестирование цепочек
Есть альтернативный подход. Мы можем просто протестировать всю партию за один раз, синхронно вызывая цепочку задач (так же, как мы делали это для модульного тестирования отдельных задач Celery).
import unittest import responses import tasks from unittest import TestCase from celery import chain class Tests(TestCase): @responses.activate def test_chain(self): responses.add(responses.GET, 'https://api.coindesk.com/v1/bpi/historical/close.json?start=2018-05-01&end=2018-05-08', body='{"bpi":{"2018-05-01":9067.715,"2018-05-02":9219.8638,"2018-05-03":9734.675,"2018-05-04":9692.7175,"2018-05-05":9826.5975,"2018-05-06":9619.1438,"2018-05-07":9362.5338,"2018-05-08":9180.1588},"disclaimer":"This data was produced from the CoinDesk Bitcoin Price Index. BPI value data returned as USD.","time":{"updated":"May 9, 2018 00:03:00 UTC","updatedISO":"2018-05-09T00:03:00+00:00"}}', status=200) task = chain( tasks.fetch_bitcoin_price_index.s(start_date='2018-05-01', end_date='2018-05-08'), tasks.calculate_moving_average.s(window=2)).apply() self.assertEqual(task.status, 'SUCCESS') self.assertEqual(task.result, [ {'date': '2018-05-01', 'ma': '', 'value': 9067.715}, {'date': '2018-05-02', 'ma': 9143.7894, 'value': 9219.8638}, {'date': '2018-05-03', 'ma': 9477.2694, 'value': 9734.675}, {'date': '2018-05-04', 'ma': 9713.69625, 'value': 9692.7175}, {'date': '2018-05-05', 'ma': 9759.657500000001, 'value': 9826.5975}, {'date': '2018-05-06', 'ma': 9722.87065, 'value': 9619.1438}, {'date': '2018-05-07', 'ma': 9490.8388, 'value': 9362.5338}, {'date': '2018-05-08', 'ma': 9271.346300000001, 'value': 9180.1588} ])
Здесь мы используем ответы, чтобы смоделировать вызов запросов, но в остальном мы позволяем нашему тесту фактически выполнить цепочку задач. Если вы выполняете (и должны делать) модульное тестирование своих задач Celery, вы в итоге получаете избыточные тесты, которые, в свою очередь, затягивают процесс разработки. Однако эта тестовая установка может иметь смысл, если у вас есть задачи Celery, которые всегда вызываются как часть цепочки.
Как применить это
Асинхронное связывание задач Celery через цепочки задач является мощным строительным блоком для построения сложных рабочих процессов (вспомним Lego). Тестирование цепочек Celery так же важно, как и тестирование отдельных задач Celery. Мок над цепочкой Celery и связанными задачами - это простой и эффективный способ оставаться на вершине рабочего процесса Celery, каким бы сложным он ни был.