Упрощение кода на Python для проектов по обработке данных
Необработанные данные поступают из различных источников и в различных форматах. Прежде чем эти данные станут доступны для ответов на критически важные вопросы бизнеса, необходимо приложить значительные усилия и потратить время на проектирование данных. Хотя базовая инфраструктура данных может меняться в зависимости от объема данных, скорости их обработки и требований к аналитике, некоторые фундаментальные методы проектирования кода по-прежнему актуальны для упрощения и оптимизации различных задач во времени.
В этой статье мы рассмотрим различные критически важные части общих проектов по обработке данных, начиная от ввода данных и заканчивая тестированием конвейера. Python — самый распространенный язык программирования для инженерии данных, и мы узнаем, как решать эти задачи с помощью встроенных функций и эффективных библиотек Python.
Представьте, что у вас есть интернет-магазин, в котором продаются уникальные подарки на все случаи жизни. Интернет-магазин настолько популярен, что в нем ежеминутно и ежесекундно совершается большое количество сделок. У вас есть желание удовлетворить больше потребностей текущих клиентов и обслужить больше новых клиентов, анализируя покупательские привычки текущих транзакций, и это побуждает вас погрузиться в обработку данных записей транзакций в качестве подготовки.
#0 Макетные данные
Для начала мы сформируем некоторые данные о транзакциях в файл, используя текстовый формат JSON Lines (JSONL), где каждая строка представляет собой отдельный JSON-объект. Этот формат привлекателен для потоковой передачи данных в таких областях, как аналитика веб-приложений и ведение журналов.
В нашем файле поля данных относятся к различным типам данных. Это идентификаторы клиента и товара (в формате целого числа/массива), способ оплаты (в формате строки) и общая сумма транзакции (в виде числа с плавающей запятой).
import json
import random
import numpy as np
import datetime
# Remove existing 'retail_transactions.jsonl' file, if any
! rm -f /p/a/t/h retail_transactions.jsonl
# Set the no of transactions
no_of_iteration = 500000
# Open a file in write mode
with open('retail_transactions.jsonl', 'w') as f:
for num in range(no_of_iteration):
if (random.randint(1, 10000) != 5000):
# Create a valid transaction
new_txn = {
'orderID': num,
'customerID': random.randint(1, 100000),
'productID': np.random.randint(10000, size=random.randint(1, 5)).tolist(),
'paymentMthd': random.choice(['Credit card', 'Debit card', 'Digital wallet', 'Cash on delivery', 'Cryptocurrency']),
'totalAmt': round(random.random() * 5000, 2),
'invoiceTime': datetime.datetime.now().isoformat()
}
else:
# Create an invalid transaction
new_txn = {
'orderID': "",
'customerID': "",
'productID': "",
'paymentMthd': "",
'totalAmt': "",
'invoiceTime': ""
}
# Write the transaciton as a JSON line to the file
f.write(json.dumps(new_txn) + "\n")
Вы можете обнаружить несколько отдельных транзакций с пустыми полями данных, записанными в файл. Это имитирует проблему отсутствия данных, как одну из проблем качества данных, часто встречающихся в реальном мире.
#1 Ввод данных – Yield
Для чтения записей о транзакциях из файла одним из самых простых подходов является циклическое преобразование набора данных в список, а затем преобразование его в Pandas DataFrame.
Этот метод будет работать как шарм для 500 000 транзакций в нашем демонстрационном наборе данных. Но что, если реальные наборы данных содержат от миллионов до миллиардов строк? Мы можем долго ждать завершения всех вычислений, что приведет к проблемам с памятью.
Иногда нам не важны все результаты, и мы хотим обработать начальные результаты до того, как будет загружена последняя запись. В таких случаях мы можем альтернативно использовать yield
для управления потоком генератора.
Существуют процедуры чередования пользовательских кодов с библиотечными. Кроме того, обеспечивается последовательность, что означает, что вы не можете получить доступ ко второй записи, не дойдя до первой. Подробнее об этой концепции вы можете узнать из видеоролика Pydata talk, в котором дается подробное объяснение.
Оператор yield
имеет различные практические применения. Например, мы можем пройтись по каждой строке файла и выдать только непустые записи. Ниже показано, как мы можем выполнить фильтрацию данных в режиме реального времени:
import json
def read_json_file(file_name):
# Read the JSONL file
with open(file_name) as f:
for line in f:
txn = json.loads(line)
# Yield valid transactions only
if (txn['orderID'] != ""):
yield(txn)
txn_generator = read_json_file('retail_transactions.jsonl')
На выходе этих кодов получается генератор Python, особый тип итератора. Вы можете использовать функцию next
в цикле, чтобы возвращать последующие элементы один за другим. Помимо фильтрации данных в реальном времени, еще одна идея — разработать функцию-генератор, которая предварительно обрабатывает данные и выдаёт их в заранее заданном объеме, который можно легко разобрать, чтобы передать модели машинного обучения для тренировки. Более того, мы можем использовать её для асинхронной обработки веб-запросов и ответов при просмотре веб-страниц.
#2 Проверка данных - Pydantic
Предположим, что у вас есть список данных JSON, который содержит информацию о записях транзакций после занесения данных. Вот пример транзакции:
{
'orderID': 10000,
'customerID': 48316,
'productID': [5620],
'paymentMthd': 'Cash on delivery',
'totalAmt': 9301.2,
'invoiceTime': '2024-06-10T23:30:29.608443',
'price': -1
}
Для каждой входящей информации мы хотим обеспечить её проверку, иначе при выполнении последующих функций обработки данных мы легко столкнемся с различными типами ошибок. Этого можно добиться с помощью библиотеки Pydantic.
Сначала мы определяем схему полей наших данных с помощью модели Pydantic, а затем проверяем наши JSON-данные с помощью функции model_validate()
.
from datetime import datetime
from pydantic import BaseModel, ValidationError
# Define the data model for a transaction record
class TxnModel(BaseModel):
orderID: int
customerID: int
productID: list[int]
paymentMthd: str
totalAmt: float
invoiceTime: datetime
try:
# Validate the sample case against the schema
TxnModel.model_validate(sample_txn)
print("Validated successfully!")
except ValidationError as exc:
# Print error messages for any validation error
print("Validation Error:")
print(exc.errors())
# Output:
# Validated successfully
Иногда возникает необходимость применять более строгие правила валидации. Например, базовая модель Pydantic пытается по возможности привести строковые данные к целому числу. Чтобы избежать этого, вы можете установить strict=True
на уровне модели или поля.
Кроме того, мы можем применять к полям данных пользовательские правила валидации. Например, мы можем захотеть проверить, соответствует ли значение способа оплаты нашим ожиданиям. Чтобы облегчить тестирование, мы вручную установили метод оплаты в примере на «Bitcoin», который является несуществующей опцией в интернет-магазине, а затем с помощью AfterValidator
встроили функцию для дальнейшей проверки.
from typing import Annotated
from pydantic.functional_validators import AfterValidator
# Customize the validation rule
def validate_payment_mthd(paymentMthd: str):
possible_values = ['Credit card', 'Debit card', 'Digital wallet', 'Cash on delivery', 'Cryptocurrency']
if paymentMthd not in possible_values:
raise ValueError(f"Invalid paymentMthd, payment type must be one of {possible_values}")
return storage
# Define the data model for a transaction record
class TxnModel(BaseModel):
orderID: int = Field(strict=True)
customerID: int
productID: list[int]
paymentMthd: Annotated[str, AfterValidator(validate_payment_mthd)]
totalAmt: Annotated[float, Field(strict=True, gt=0)]
invoiceTime: datetime
# Manually define a non-existent payment method
sample_txn['paymentMthd'] = 'Bitcoin'
try:
# Validate the sample case against the schema
TxnModel.model_validate(sample_txn)
print("Validated successfully!")
except ValidationError as exc:
# Print error messages for any validation error
print("Validation Error:")
print(exc.errors()[0]['ctx'])
# Output
# Validation Error:
# {'error': ValueError("Invalid paymentMthd, payment type must be one of ['Credit card', 'Debit card', 'Digital wallet', 'Cash on delivery', 'Cryptocurrency']")}
Валидатор успешно определяет, что метод оплаты не входит в список возможных значений. Для этого применяется внутренняя логика валидации Pydantic, а затем пользовательские функции валидации. Код вызывает ошибку ValueError
, которая заполняет ValidationError
.
Когда ошибка срабатывает, мы можем выполнить последующие действия для ее исправления. Эти функции помогают устранить ошибки в данных, тем самым обеспечивая точность и полноту наших данных.
#3 Обработка данных
Декоратор Python
После проверки данных мы начинаем работать с функциями, требующими больших объемов данных. По мере усложнения обработки данных велика вероятность столкнуться с длительным временем выполнения. Мы хотим выявить первопричину и оптимизировать временные характеристики функций. Один из простых методов — сбор двух временных меток в начале и конце каждой функции, а затем поочередное вычисление разницы во времени.
Чтобы код был менее загроможден на протяжении всей обработки данных, мы можем использовать декоратор Python.
Сначала мы разработаем декоратор Python, который измеряет время выполнения. Затем мы аннотируем любую функцию, которой требуется эта характеристика.
Например, можно измерить время, затрачиваемое на категоризацию цен для всех транзакций.
import time
# Measure the excution time of a given function
def time_decorator(func):
def wrapper(*args, **kwargs):
begin_time = time.time()
output = func(*args, **kwargs)
end_time = time.time()
print(f"Execution time of function {func.__name__}: {round(end_time - begin_time, 2)} seconds.")
return output
return wrapper
# Categorize the total amount of each transaction
@time_decorator
def group_txn_price(data):
for txn in data:
price = txn['totalAmt']
if 0 <= price <= 1500:
txn['totalAmtCat'] = 'Low'
elif 1500 < price <= 3500:
txn['totalAmtCat'] = 'Moderate'
elif 3500 < price:
txn['totalAmtCat'] = 'High'
return data
txn_list = group_txn_price(txn_list)
# Output
# Execution time of function group_txn_price: 0.26 seconds.
Декораторный подход делает код многоразовым без изменения исходного кода наших оригинальных функций. Аналогичным образом мы можем применить идеи декораторов для протоколирования завершения функций или оповещения по электронной почте при возникновении сбоев в работе.
Map, reduce, filter
Это широко используемые методы массивов Python, с которыми многие разработчики могут быть знакомы. Но я всё же считаю, что их стоит упомянуть по нескольким причинам:
- Неизменяемость – функции не изменяют значения исходных списков;
- Гибкость цепочки – можно применять комбинацию функций одновременно;
- Краткость и читабельность – всего одна строка кода.
Предположим, у нас есть список JSON-объектов, содержащий всего два ключа: способ оплаты и общая сумма. Давайте рассмотрим несколько примеров работы этих функций.
Map: Выполнить одну и ту же операцию над всеми элементами списка (например, добавить суффикс к значениям метода оплаты).
updated_txn_list = list(map(lambda x: {
'paymentMthd': f"{x['paymentMthd']}_2024",
"totalAmt": x["totalAmt"]
}, txn_list))
print(updated_txn_list)
# Output
# [{'paymentMthd': 'Cryptocurrency_2024', 'totalAmt': 3339.85},
# {'paymentMthd': 'Cash on delivery_2024', 'totalAmt': 872.52},
# ...]
Filter: Получение подмножества элементов, удовлетворяющих определенному условию (например, только записи с криптовалютой в качестве способа оплаты).
updated_txn_list = list(map(lambda x: x, filter(lambda y: y["paymentMthd"] == "Cryptocurrency", txn_list)))
print(updated_txn_list)
# Output
# [{'paymentMthd': 'Cryptocurrency', 'totalAmt': 3339.85},
# {'paymentMthd': 'Cryptocurrency', 'totalAmt': 576.15},
# ...]
Reduce: Получение результата с одним значением (например, суммирование или умножение всех элементов).
from functools import reduce
total_amt_crypto = reduce(lambda acc, x: acc + x["totalAmt"], updated_txn_list, 0)
print(total_amt_crypto)
# Output
# 250353984.67000002
Мы можем использовать эти функции на этапах преобразования данных в проектах по науке о данных. Например, используйте map()
для масштабирования или нормализации данных, filter()
для удаления провалов и нерелевантных точек данных, а reduce()
для получения сводной статистики.
#4 Тестирование конвейера данных - Pytest
Конвейеры данных часто включают в себя операции по вводу данных, их очистке и извлечению-трансформированию-загрузке (ETL). Сфера потенциальных ошибок может быть очень широкой, и их легко упустить из виду, особенно если поток модели и результат трудно интерпретировать пользователям. Это приводит к тому, что команда разработчиков в большей степени полагается на тестирование.
Обычно проводится модульное тестирование, чтобы убедиться, что каждый компонент системы машинного обучения работает так, как ожидается.
Одним из самых популярных фреймворков для тестирования на Python является Pytest. Представьте, что мы хотим обеспечить высокое качество преобразованных данных, которым могут доверять как технические специалисты, так и лица, принимающие решения. Мы можем протестировать пройденную нами функцию категоризации цен на сделки. Для этого нам нужно подготовить два файла Python:
feature_engineering.py
: Файл, содержащий ранее созданную функцию.
# Categorize the total amount of each transaction
def add_features(sample_cases):
for txn in sample_cases:
price = txn[‘totalAmt’]
if 0 <= price <= 1500:
txn[‘totalAmtCat’] = ‘Low’
elif 1500 < price <= 3500:
txn[‘totalAmtCat’] = ‘Moderate’
elif 3500 < price:
txn[‘totalAmtCat’] = ‘High’
return sample_cases
test_feature_engineering.py
: Файл с префиксом «test_
», который Pytest будет распознавать только для целей тестирования.
from feature_engineering import add_features
def test_add_features():
sample_cases = [{
'orderID': 1,
'customerID': 36536,
'productID': [2209, 2262, 4912, 3162, 5734],
'paymentMthd': 'Cryptocurrency',
'totalAmt': 576.15,
'invoiceTime': '2024–06–10T23:53:25.329928'
}]
# Call the function with the sample cases
sample_cases = add_features(sample_cases)
# Check the assertations
for txn in sample_cases:
assert 'totalAmtCat' in list(txn.keys())
assert len(txn) == 7
assert len(txn['totalAmtCat']) != 0
Вышеприведенные утверждения гарантируют, что новое поле данных «totalAmtCat
» будет добавлено с непустым значением, а исходные поля данных не будут затронуты. Выполнив команду Pytest
, мы можем узнать, что наш тест пройден!
Для более сложного случая предположим, что у нас есть три функции со следующими последовательностями: load_data
, clean_data
и add_features
. Как мы должны спроектировать тестовый файл, чтобы проверить вывод этих функций по очереди?
import pytest
import json
from feature_engineering import load_data, clean_data, add_features
# Set up a temporary JSONL file
@pytest.fixture
def jsonl_file(tmp_path):
sample_cases = [{'orderID': 10000,
'customerID': 48316,
'productID': [5620],
'paymentMthd': 'Cash on delivery',
'totalAmt': 9301.2,
'invoiceTime': '2024-06-10T23:30:29.608443',
'price': -1
}]
file_path = tmp_path + "/test_transactions.jsonl"
with open(file_path, 'w') as f:
for txn in sample_cases:
f.write(json.dumps(txn) + "\n")
return file_path
# Test function to validate the `load_data` function
def test_load_data(jsonl_file):
data = load_data(jsonl_file)
# assert statements here
# Test function to validate the `clean_data` function
def test_clean_data(jsonl_file):
data = load_data(jsonl_file)
data = clean_data(data)
# assert statements here
# Test function to validate the `add_features` function
def test_add_features(jsonl_file):
data = load_data(jsonl_file)
data = clean_data(data)
data = add_features(data)
# assert statements here
Мы должны определить фиксированный базовый уровень для инициализации, например, файл JSON Lines с примерами тестовых случаев. Здесь мы используем декоратор @pytest.fixtured
, который работает аналогично декоратору time_decorator
, который мы обсуждали ранее в разделе о декораторах Python. Этот декоратор помогает избежать повторной инициализации файлов примеров. В остальных кодах мы задействуем несколько тестовых функций для запуска конвейерных функций и используем утверждения для обнаружения логических ошибок.
Завершение работы
Мы столкнулись с несколькими важными аспектами проектов по разработке данных и изучили, как упростить и оптимизировать код на Python для повышения эффективности и читабельности:
- Ввод данных с помощью
yield
для обработки больших массивов данных с эффективным использованием памяти. - Проверка данных с помощью Pydantic для проверки полей данных на основе схемы и настраиваемых шаблонов значений.
- Обработка данных с помощью декораторов Python и встроенных библиотек, позволяющих реализовать дополнительные функции без повторения кода.
- Тестирование конвейера с помощью Pytest для обеспечения высокого качества вывода функций на протяжении всего рабочего процесса.
Благодарю за прочтение!
Источник доступен по ссылке.