DevGang
Авторизоваться

Упрощение кода Python для проектов по инженерии данных

Необработанные данные поступают из разных источников и форматов. Прежде чем данные станут доступны для ответа на важные бизнес-вопросы, потребуются значительные усилия и время для выполнения обработки данных. Хотя базовая инфраструктура данных может различаться в зависимости от объема данных, скорости и требований к аналитике, некоторые фундаментальные методы проектирования кода по-прежнему актуальны для упрощения и оптимизации различных задач с течением времени.

В этой статье будут рассмотрены различные важные части общих проектов по инженерии данных, от приема данных до тестирования конвейера. Python — наиболее широко используемый язык программирования для проектирования данных, и мы научимся справляться с этими вариантами использования, используя встроенные функции и эффективные библиотеки Python.

Представьте, что у вас есть розничный интернет-магазин, в котором продаются уникальные подарки на все случаи жизни. Интернет-магазин настолько популярен, что каждую минуту и ​​секунду в нем совершается большой объем транзакций. У вас есть стремление удовлетворить потребности большего числа текущих клиентов и обслуживать больше новых клиентов путем анализа покупательских привычек в отношении текущих транзакций, поэтому это мотивирует вас погрузиться в обработку данных записей транзакций в качестве подготовки.

#0 Фиктивные данные

Сначала мы помещаем некоторые данные транзакции в файл, используя текстовый формат JSON Lines (JSONL), где каждая строка представляет собой отдельный объект JSON. Этот формат подходит для потоковой передачи данных в таких областях, как аналитика веб-приложений и управление журналами.

В нашем файле поля данных относятся к различным типам данных. Они включают идентификаторы клиента и продукта (в формате целого числа/массива), способ оплаты (в строковом формате) и общую сумму транзакции (в формате с плавающей запятой).

import json
import random
import numpy as np
import datetime

# Remove existing 'retail_transactions.jsonl' file, if any
! rm -f /p/a/t/h retail_transactions.jsonl

# Set the no of transactions
no_of_iteration = 500000

# Open a file in write mode
with open('retail_transactions.jsonl', 'w') as f:
  for num in range(no_of_iteration):
    if (random.randint(1, 10000) != 5000):
      # Create a valid transaction
      new_txn = {
        'orderID': num,
        'customerID': random.randint(1, 100000),
        'productID': np.random.randint(10000, size=random.randint(1, 5)).tolist(),
        'paymentMthd': random.choice(['Credit card', 'Debit card', 'Digital wallet', 'Cash on delivery', 'Cryptocurrency']),
        'totalAmt': round(random.random() * 5000, 2),
        'invoiceTime': datetime.datetime.now().isoformat()
      }
    else:
      # Create an invalid transaction
      new_txn = {
        'orderID': "",
        'customerID': "",
        'productID': "",
        'paymentMthd': "",
        'totalAmt': "",
        'invoiceTime': ""
       }
     
     # Write the transaciton as a JSON line to the file
     f.write(json.dumps(new_txn) + "\n")

Вы можете обнаружить несколько одиночных транзакций с пустыми полями данных, записанными в файл. Это имитирует проблему отсутствия данных, как одну из проблем качества данных, часто встречающихся в реальном мире.

#1 Прием данных — доходность

Чтобы прочитать записи транзакций из файла, один из самых простых подходов — преобразовать набор данных в список, а затем преобразовать его в DataFrame Pandas.

Этот метод будет работать как чудо для 500 000 транзакций в нашем демонстрационном наборе данных. Но что, если реальные наборы данных содержат от миллионов до даже миллиардов строк? Мы можем долго сидеть на месте, дожидаясь завершения всех вычислений, что может привести к проблемам с памятью.

Иногда нас не волнуют все результаты, и вместо этого мы хотим обработать первоначальные результаты до загрузки последней записи. В таких случаях мы можем альтернативно использовать выход для управления потоком работы генератора.

Генератор не сохраняет в памяти все записи. Вместо этого он выдает значение по одному и приостанавливает выполнение функции до тех пор, пока не будет запрошено следующее значение.

Существуют процедуры чередования пользовательских кодов с библиотечными кодами. Он также обеспечивает соблюдение последовательности, что означает, что вы не можете получить доступ ко второй записи, пока не достигнете первой. Вы можете узнать больше об этой концепции в видеоролике Pydata, в котором дается подробное объяснение.

Оператор доходности имеет разные практические применения. Например, мы можем просмотреть каждую строку файла и получить только непустые записи. Ниже показано, как мы можем выполнять фильтрацию данных в реальном времени:

import json

def read_json_file(file_name):
  # Read the JSONL file
  with open(file_name) as f:
    for line in f:
      txn = json.loads(line)
      # Yield valid transactions only
      if (txn['orderID'] != ""):
        yield(txn)

txn_generator = read_json_file('retail_transactions.jsonl')

Результатом этих кодов является генератор Python — итератор особого типа. Вы можете использовать функцию next в цикле, чтобы возвращать последующие элементы один за другим. Помимо фильтрации данных в реальном времени, другая идея состоит в том, чтобы разработать функцию-генератор, которая предварительно обрабатывает данные и выдает их в заранее определенном размере пакета, который можно напрямую проанализировать, чтобы передать модель машинного обучения для обучения. Более того, мы можем использовать его для асинхронной обработки веб-запросов и ответов при сканировании веб-страниц.

#2 Проверка данных — Pydantic

Предположим, у вас есть список данных JSON, который содержит информацию о записях транзакций после приема данных. Вот пример транзакции:

{
 'orderID': 10000,
 'customerID': 48316,
 'productID': [5620],
 'paymentMthd': 'Cash on delivery',
 'totalAmt': 9301.2,
 'invoiceTime': '2024-06-10T23:30:29.608443',
 'price': -1
}

Мы хотим обеспечить проверку каждых входящих данных, в противном случае мы легко столкнемся с различными типами ошибок при запуске последующих функций обработки данных. Этого можно добиться с помощью библиотеки pydantic.

Сначала мы определяем схему наших полей данных с помощью модели Pydantic, затем проверяем наши данные JSON с помощью функции model_validate().
from datetime import datetime
from pydantic import BaseModel, ValidationError

# Define the data model for a transaction record
class TxnModel(BaseModel):
  orderID: int
  customerID: int
  productID: list[int]
  paymentMthd: str
  totalAmt: float
  invoiceTime: datetime

try:
  # Validate the sample case against the schema
  TxnModel.model_validate(sample_txn)
  print("Validated successfully!")
except ValidationError as exc:
  # Print error messages for any validation error
  print("Validation Error:")
  print(exc.errors())

# Output:
# Validated successfully

Иногда мы обнаруживаем необходимость применить более строгие правила проверки. Например, базовая модель Pydantic пытается, если это возможно, привести строковые данные к целому числу. Чтобы избежать этого, вы можете установить strict=True на уровне модели или поля.

Кроме того, мы можем применять к полям данных собственные правила проверки. Например, мы можем захотеть проверить, соответствует ли значение метода оплаты нашим ожиданиям. Чтобы облегчить тестирование, мы вручную установили способ оплаты в примере случая «Биткойн», которого нет в интернет-магазине, а затем использовали AfterValidator для встраивания функции для дальнейшей проверки.

from typing import Annotated
from pydantic.functional_validators import AfterValidator

# Customize the validation rule
def validate_payment_mthd(paymentMthd: str):
  possible_values = ['Credit card', 'Debit card', 'Digital wallet', 'Cash on delivery', 'Cryptocurrency']
  if paymentMthd not in possible_values:
    raise ValueError(f"Invalid paymentMthd, payment type must be one of {possible_values}")
  return storage

# Define the data model for a transaction record
class TxnModel(BaseModel):
  orderID: int = Field(strict=True)
  customerID: int
  productID: list[int]
  paymentMthd: Annotated[str, AfterValidator(validate_payment_mthd)]
  totalAmt: Annotated[float, Field(strict=True, gt=0)]
  invoiceTime: datetime

# Manually define a non-existent payment method
sample_txn['paymentMthd'] = 'Bitcoin'

try:
  # Validate the sample case against the schema
  TxnModel.model_validate(sample_txn)
  print("Validated successfully!")
except ValidationError as exc:
  # Print error messages for any validation error
  print("Validation Error:")
  print(exc.errors()[0]['ctx'])

# Output
# Validation Error:
# {'error': ValueError("Invalid paymentMthd, payment type must be one of ['Credit card', 'Debit card', 'Digital wallet', 'Cash on delivery', 'Cryptocurrency']")}

Валидатор успешно определяет, что способ оплаты не входит в список возможных значений. Это делается путем применения внутренней логики проверки Pydantic, за которой следуют пользовательские функции проверки. Код вызывает ValueError, который заполняет ValidationError.

При возникновении ошибки мы можем предпринять последующие действия по ее исправлению. Эти функции помогают устранить ошибки в данных, обеспечивая тем самым точность и полноту наших данных.

#3 Обработка данных

(1) Декоратор Python

После проверки данных мы начинаем работать с функциями, интенсивно использующими данные. Существует высокая вероятность увеличения времени выполнения, поскольку конвейер данных становится сложным. Мы хотим выявить первопричину и оптимизировать время выполнения функций. Один простой метод — собрать две временные метки в начале и конце каждой функции, а затем одну за другой вычислить разницу во времени.

Чтобы гарантировать, что код будет менее загроможден во всем конвейере данных, мы можем использовать декоратор Python.

Сначала мы создадим декоратор Python, который измеряет время выполнения. После этого мы аннотируем любую функцию, которой требуется эта функция.

Например, вы можете измерить время, необходимое для классификации цен для всех транзакций.

import time

# Measure the excution time of a given function
def time_decorator(func):
  def wrapper(*args, **kwargs):
    begin_time = time.time()
    output = func(*args, **kwargs)
    end_time = time.time()
    print(f"Execution time of function {func.__name__}: {round(end_time - begin_time, 2)} seconds.")
    return output
  return wrapper

# Categorize the total amount of each transaction
@time_decorator
def group_txn_price(data):
  for txn in data:
    price = txn['totalAmt']
    if 0 <= price <= 1500:
      txn['totalAmtCat'] = 'Low'
    elif 1500 < price <= 3500:
      txn['totalAmtCat'] = 'Moderate'
    elif 3500 < price:
      txn['totalAmtCat'] = 'High'
    return data

txn_list = group_txn_price(txn_list)

# Output
# Execution time of function group_txn_price: 0.26 seconds.

Подход декоратора делает код пригодным для повторного использования без изменения исходного кода наших исходных функций. Аналогичным образом мы можем применить идеи декоратора для регистрации завершения функций или оповещения по электронной почте, когда задания сталкиваются с сбоем.

(2) Сопоставить, уменьшить, отфильтровать

Это широко используемые методы массивов Python, с которыми могут быть знакомы многие разработчики. Но я все же считаю, что их стоит упомянуть по нескольким причинам:

(1) immutable — функции не изменяют значения исходных списков

(2) chain flexibility — можно одновременно применять комбинацию функций

(3) concise и readable — всего с одной строкой кода

Предположим, у нас есть список объектов JSON всего с двумя ключами: способ оплаты и общая сумма. Давайте рассмотрим несколько примеров того, как работают эти функции.

Map: выполните одну и ту же операцию со всеми элементами в списке (например, добавьте суффикс к значениям способа оплаты).

updated_txn_list = list(map(lambda x: {
                      'paymentMthd': f"{x['paymentMthd']}_2024",
                      "totalAmt": x["totalAmt"]
                   }, txn_list))

print(updated_txn_list)

# Output
# [{'paymentMthd': 'Cryptocurrency_2024', 'totalAmt': 3339.85},
# {'paymentMthd': 'Cash on delivery_2024', 'totalAmt': 872.52},
# ...]

Filter: получите подмножество элементов, соответствующих определенным условиям (например, только записи с криптовалютой в качестве способа оплаты).

updated_txn_list = list(map(lambda x: x, filter(lambda y: y["paymentMthd"] == "Cryptocurrency", txn_list)))

print(updated_txn_list)

# Output
# [{'paymentMthd': 'Cryptocurrency', 'totalAmt': 3339.85},
# {'paymentMthd': 'Cryptocurrency', 'totalAmt': 576.15},
# ...]

Reduce: получить однозначный результат (например, суммирование или умножение всех элементов).

from functools import reduce

total_amt_crypto = reduce(lambda acc, x: acc + x["totalAmt"], updated_txn_list, 0)

print(total_amt_crypto)

# Output
# 250353984.67000002

Мы можем использовать эти функции на этапах преобразования в проектах по науке о данных. Например, используйте Map() для масштабирования или нормализации данных, используйте filter() для удаления выбросов и нерелевантных точек данных и используйте Reduc() для создания сводной статистики.

#4 Тестирование конвейера данных — Pytest

Конвейеры данных часто включают в себя прием данных, очистку данных и операции извлечения-преобразования-загрузки (ETL). Объем потенциальных ошибок может быть широким, и их легко упустить из виду, особенно потому, что пользователям трудно интерпретировать ход модели и результаты. Это приводит к большей зависимости от усилий команды разработчиков по тестированию.

Обычно проводится модульное тестирование, чтобы убедиться, что каждый компонент системы машинного обучения работает должным образом.

Одна из самых популярных сред тестирования Python — Pytest. Представьте, что мы хотим обеспечить высокое качество преобразованных данных, которому могут доверять как технические группы, так и лица, принимающие решения. Мы можем проверить функцию, которую мы использовали для классификации цен транзакций. Для этого нам нужно подготовить два файла Python:

  • Feature_engineering.py: файл, содержащий ранее созданную функцию.
# Categorize the total amount of each transaction
def add_features(sample_cases):
  for txn in sample_cases:
    price = txn[‘totalAmt’]
  if 0 <= price <= 1500:
    txn[‘totalAmtCat’] = ‘Low’
  elif 1500 < price <= 3500:
    txn[‘totalAmtCat’] = ‘Moderate’
  elif 3500 < price:
    txn[‘totalAmtCat’] = ‘High’

return sample_cases
  • test_feature_engineering.py: файл с префиксом test_, который Pytest распознает только в целях тестирования.
from feature_engineering import add_features

def test_add_features():
  sample_cases = [{
      'orderID': 1,
      'customerID': 36536,
      'productID': [2209, 2262, 4912, 3162, 5734],
      'paymentMthd': 'Cryptocurrency',
      'totalAmt': 576.15,
      'invoiceTime': '2024–06–10T23:53:25.329928'
    }]

  # Call the function with the sample cases
  sample_cases = add_features(sample_cases)
 
  # Check the assertations
  for txn in sample_cases: 
    assert 'totalAmtCat' in list(txn.keys())
    assert len(txn) == 7
    assert len(txn['totalAmtCat']) != 0

Приведенные выше операторы утверждения гарантируют, что новое поле данных totalAmtCat будет добавлено с непустым значением, а исходные поля данных не будут затронуты. Выполнив команду Pytest, мы узнаем, что наш тест пройден!

В более сложном случае предположим, что у нас есть три функции со следующими последовательностями: load_data, clean_data и add_features. Как нам следует разработать тестовый файл для проверки результатов выполнения этих функций одну за другой?

import pytest
import json
from feature_engineering import load_data, clean_data, add_features

# Set up a temporary JSONL file
@pytest.fixture
def jsonl_file(tmp_path):
  sample_cases = [{'orderID': 10000,
    'customerID': 48316,
    'productID': [5620],
    'paymentMthd': 'Cash on delivery',
    'totalAmt': 9301.2,
    'invoiceTime': '2024-06-10T23:30:29.608443',
    'price': -1
  }]

  file_path = tmp_path + "/test_transactions.jsonl"

  with open(file_path, 'w') as f:
    for txn in sample_cases:
        f.write(json.dumps(txn) + "\n")

  return file_path

# Test function to validate the `load_data` function
def test_load_data(jsonl_file):
  data = load_data(jsonl_file)
  # assert statements here

# Test function to validate the `clean_data` function
def test_clean_data(jsonl_file):
  data = load_data(jsonl_file)
  data = clean_data(data)
  # assert statements here

# Test function to validate the `add_features` function
def test_add_features(jsonl_file):
  data = load_data(jsonl_file)
  data = clean_data(data)
  data = add_features(data)
  # assert statements here

Нам следует определить фиксированную базовую линию для инициализации, например файл JSON Lines с примерами тестовых случаев. Здесь мы используем @pytest.fixturedecorator, который работает аналогично time_decorator, который мы обсуждали ранее в разделе «Декоратор Python». Этот декоратор помогает предотвратить повторную инициализацию файлов примеров. Для остальных кодов мы задействуем несколько тестовых функций для запуска функций конвейера и используем операторы утверждения для обнаружения логических ошибок.

Заключение

Мы столкнулись с несколькими важными аспектами проектов по инженерии данных и изучили, как упростить и оптимизировать код Python для повышения эффективности и читабельности:

Прием данных с использованием дохода для обработки больших наборов данных с эффективным использованием памяти.

Проверка данных за счет использования Pydantic для проверки полей данных на основе схемы и настраиваемых шаблонов значений.

Обработка данных путем применения декоратора Python и встроенных библиотек для включения дополнительных функций без повторного кода.

Конвейерное тестирование с использованием Pytest для обеспечения высококачественного вывода функций на протяжении всего рабочего процесса.

Источник

#Python
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу