У вас включен AdBlock или иной блокировщик рекламы.

Пожалуйста, отключите его, доход от рекламы помогает развитию сайта и появлению новых статей.

Спасибо за понимание.

В другой раз
DevGang блог о програмировании
Авторизоваться

Тестирование цепочек Celery

Цепочки Celery позволяют вам модульно оформить ваше приложение и повторно использовать общие задачи. Классическим примером использования является система рыночных данных.

Цель состоит в том, чтобы использовать рыночные данные от разных поставщиков данных, таких как Bloomberg или Reuters. API-интерфейсы у разных сервисов, как полагается могут быть разных видов и форм, но в конечном итоге данные должны быть преобразованы к одному формату и сохранены в таблице базы данных.

При объединении в цепочку задач Celery нам нужно только создать небольшое количество специализированных задач подачи и преобразования данных для каждого поставщика, но мы можем повторно использовать общие задачи, такие как десериализация и запись данных.

Звучит вероятно сложно, но на деле все гораздо проще чем кажется.

Пример

Допустим, мы хотим загрузить данные временных рядов криптовалюты из ряда различных API и вычислить средние значение для каждого из этих временных рядов.

Например, одним из таких временных рядов будет индекс цен на биткойны, доступный через Coindesk API.

После тщательного рассмотрения возможности повторного использования и разделения проблем мы решили реализовать общую задачу Celery для вычисления среднего значения, ожидая в качестве параметров список объектов - [{"date": "2018-05-01": "value": 1000.0}, {"date": "2018-05-02": "value": 1003.5}, ...] - и количество дней, в течение которых мы хотим рассчитать среднее значение:

import pandas as pd
import numpy as np
from celery import chain
from worker import app

@app.task(bind=True, name='calculate_moving_average')
def calculate_moving_average(self, timeseries, window):
    df = pd.DataFrame(timeseries)
    df['ma'] = df['value'].rolling(window=window, center=False).mean()
    return list(df.replace(np.nan, '', regex=True).T.to_dict().values())

Теперь мы можем реализовать любую задачу Celery для любого заданного API; до тех пор, пока эта задача возвращает список объектов ожидаемого формата [{"date": "2018-05-01": "value": 1000.0}, {"date": "2018-05-02": "value ": 1003.5}, ...], мы можем просто упорядочить эти задачи, чтобы иметь возможность вычислить среднее значение для любого результата API. Например, задача Celery для фида Coindesk Bitcoin Price Index выглядит так:

import requests
from worker import app

@app.task(bind=True, name='fetch_bitcoin_price_index')
def fetch_bitcoin_price_index(self, start_date, end_date):
    url = f'https://api.coindesk.com/v1/bpi/historical/close.json?start={start_date}&end={end_date}'
    response = requests.get(url)
    if not response.ok:
        raise ValueError(f'Unexpected status code: {response.status_code}')
    return [{'date': key, 'value': value} for key, value in response.json()['bpi'].items()]

Как мы сделаем тестирование цепочек Celery? Как обычно, есть больше, чем один ответ. Давайте посмотрим на две разные стратегии и обсудим, какая из них имеет смысл в каком контексте.

Делаем “мок” цепочек Celery

Нам нужно проверить, где бы мы ни вызывали цепочку Celery, отдельные задачи вызываются в правильном порядке с правильными аргументами. Что бы ни происходило внутри задачи, мы не заботимся об этом, поскольку это уже охвачено модульным тестом.

from flask import Flask, Response, request, jsonify
from tasks import fetch_bitcoin_price_index, calculate_moving_average
from celery import chain
        
app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
    chain(
        fetch_bitcoin_price_index.s(
            start_date=request.json['start_date'],
            end_date=request.json['end_date']),
        calculate_moving_average.s(window=request.json['window'])
    ).delay()
    return '', 201

Цепочка вызывается в этом представлении Flask. Следовательно, это то, для чего нам нужно написать тест (еще один распространенный подход - реализовать специальный метод, который вызывает цепочку Celery и писать тест для этого метода).

import app
from unittest import TestCase, mock
        
class Tests(TestCase):
    
    def setUp(self):
        app.app.config['TESTING'] = True
        self.client = app.app.test_client()

    @mock.patch('app.chain')
    @mock.patch('app.fetch_bitcoin_price_index')
    @mock.patch('app.calculate_moving_average')
    def test_mocked_chain(self, mock_calculate_moving_average, mock_fetch_bitcoin_price_index, mock_chain):
        response = self.client.post('/', json={'start_date': '2018-05-01', 'end_date': '2018-05-08', 'window': 3})
        self.assertEqual(response.status_code, 201)
        mock_chain.assert_called_once_with(
            mock_fetch_bitcoin_price_index.s(start_date='2018-05-01', end_date='2018-05-08'),
            mock_calculate_moving_average.s(window=3))

Синхронное тестирование цепочек

Есть альтернативный подход. Мы можем просто протестировать всю партию за один раз, синхронно вызывая цепочку задач (так же, как мы делали это для модульного тестирования отдельных задач Celery).

import unittest
import responses
import tasks

from unittest import TestCase
from celery import chain


class Tests(TestCase):

    @responses.activate
    def test_chain(self):
        responses.add(responses.GET, 'https://api.coindesk.com/v1/bpi/historical/close.json?start=2018-05-01&end=2018-05-08', body='{"bpi":{"2018-05-01":9067.715,"2018-05-02":9219.8638,"2018-05-03":9734.675,"2018-05-04":9692.7175,"2018-05-05":9826.5975,"2018-05-06":9619.1438,"2018-05-07":9362.5338,"2018-05-08":9180.1588},"disclaimer":"This data was produced from the CoinDesk Bitcoin Price Index. BPI value data returned as USD.","time":{"updated":"May 9, 2018 00:03:00 UTC","updatedISO":"2018-05-09T00:03:00+00:00"}}', status=200)
        task = chain(
            tasks.fetch_bitcoin_price_index.s(start_date='2018-05-01', end_date='2018-05-08'),
            tasks.calculate_moving_average.s(window=2)).apply()
        self.assertEqual(task.status, 'SUCCESS')
        self.assertEqual(task.result, [
            {'date': '2018-05-01', 'ma': '', 'value': 9067.715},
            {'date': '2018-05-02', 'ma': 9143.7894, 'value': 9219.8638},
            {'date': '2018-05-03', 'ma': 9477.2694, 'value': 9734.675},
            {'date': '2018-05-04', 'ma': 9713.69625, 'value': 9692.7175},
            {'date': '2018-05-05', 'ma': 9759.657500000001, 'value': 9826.5975},
            {'date': '2018-05-06', 'ma': 9722.87065, 'value': 9619.1438},
            {'date': '2018-05-07', 'ma': 9490.8388, 'value': 9362.5338},
            {'date': '2018-05-08', 'ma': 9271.346300000001, 'value': 9180.1588}
        ])

Здесь мы используем ответы, чтобы смоделировать вызов запросов, но в остальном мы позволяем нашему тесту фактически выполнить цепочку задач. Если вы выполняете (и должны делать) модульное тестирование своих задач Celery, вы в итоге получаете избыточные тесты, которые, в свою очередь, затягивают процесс разработки. Однако эта тестовая установка может иметь смысл, если у вас есть задачи Celery, которые всегда вызываются как часть цепочки.

Как применить это

Асинхронное связывание задач Celery через цепочки задач является мощным строительным блоком для построения сложных рабочих процессов (вспомним Lego). Тестирование цепочек Celery так же важно, как и тестирование отдельных задач Celery. Мок над цепочкой Celery и связанными задачами - это простой и эффективный способ оставаться на вершине рабочего процесса Celery, каким бы сложным он ни был.

#Flask #Python #Celery

Присоеденяйся в тусовку

Поделитесь своим опытом, расскажите о новом инструменте, библиотеке или фреймворке. Для этого не обязательно становится постоянным автором.

Попробовать