DevGang
Авторизоваться

Cоздание с нуля простой ORM на Python

ORM (Object Relational Mapper) является инструментом, который позволяет взаимодействовать с вашей базой данных с помощью объектно-ориентированной парадигмы. Поэтому ORM обычно реализуются в виде библиотек на языках, поддерживающих объектно-ориентированное программирование.

Вот базовый пример с Django ORM:

# SQL: SELECT * FROM employees WHERE grade='L2' AND salary < 10000;
employees = Employee.objects.filter(grade='L2', salary__lt=10000)

Существует несколько ORM с открытым исходным кодом на разных языках программирования, таких как Django ORM (Python), Laravel Eloquent (PHP), Sequelize (JavaScript) и т.д.

Эти ORM реализуют очень продвинутые функции, и их исходный код может быть немного пугающим, когда вы новичок, поэтому в этой статье мы реализуем простую ORM с Python с нуля, чтобы демистифицировать процесс и дать общее представление о том, как он работает.

Определение использования

Поскольку мы создаем ORM с нуля, перед тем, как приступить к написанию кода, мы должны определить, как он будет использоваться. Это позволит нам получить хорошее представление о том, что мы пытаемся разработать.

Например, рассмотрим этот запрос:

SELECT * FROM employees WHERE salary = 10000;

С Django ORM, чтобы выполнить этот запрос, мы можем написать так:

employees = Employee.objects.filter(salary=10000)

Но это потому, что Django решил реализовать свой ORM, чтобы вести себя подобным образом. Вот несколько примеров того, как это можно было бы сделать иначе:

(1) Employee.get_objects(where={'salary': 10000})
(2) Employee.objects.select('*', condition=Condition(salary=10000))

Так что разработчик системы должен решить, как его система будет вести себя.

В нашем случае мы попытаемся разработать ORM, который позволит нам выполнять базовые операции создания, чтения, обновления и удаления с использованием, определенным ниже:

# SQL: SELECT salary, grade FROM employees;
employees = Employee.objects.select('salary', 'grade')  # employees: List[Employee]


# SQL: INSERT INTO employees (first_name, last_name, salary)
#  	VALUES ('Yan', 'KIKI', 10000), ('Yoweri', 'ALOH', 15000);
employees_data = [
    {"first_name": "Yan", "last_name": "KIKI", "salary": 10000},
    {"first_name": "Yoweri", "last_name": "ALOH", "salary": 15000}
]
Employee.objects.bulk_insert(rows=employees_data)


# SQL: UPDATE employees SET salary = 17000, grade = 'L2';
Employee.objects.update(
    new_data={'salary': 17000, 'grade': 'L2'}
)


# SQL: DELETE FROM employees;
Employee.objects.delete()

Поняв, как этого можно достичь, вы сможете добавить больше функций и вариантов использования.

Исполнение

Состав

Первым шагом нашей реализации ORM будет реализация структуры, которая позволит нам получить ожидаемое использование. Этого можно добиться с помощью следующего кода:

# ------------ Manager (Model objects handler) ------------ #
class BaseManager:

    def __init__(self, model_class):
        self.model_class = model_class

    def select(self, *field_names):
        pass

    def bulk_insert(self, rows: list):
        pass

    def update(self, new_data: dict):
        pass

    def delete(self):
        pass


# ----------------------- Model ----------------------- #
class MetaModel(type):
    manager_class = BaseManager

    def _get_manager(cls):
        return cls.manager_class(model_class=cls)

    @property
    def objects(cls):
        return cls._get_manager()


class BaseModel(metaclass=MetaModel):
    table_name = ""


# ----------------------- Usage ----------------------- #
class Employee(BaseModel):
    manager_class = BaseManager
    table_name = "employees"


# # SQL: SELECT salary, grade FROM employees;
# employees = Employee.objects.select('salary', 'grade')  # employees: List[Employee]
#
#
# # SQL: INSERT INTO employees (first_name, last_name, salary)
# #  	VALUES ('Yan', 'KIKI', 10000), ('Yoweri', 'ALOH', 15000);
# employees_data = [
#     {"first_name": "Yan", "last_name": "KIKI", "salary": 10000},
#     {"first_name": "Yoweri", "last_name": "ALOH", "salary": 15000}
# ]
# Employee.objects.bulk_insert(rows=employees_data)
#
#
# # SQL: UPDATE employees SET salary = 17000, grade = 'L2';
# Employee.objects.update(
#     new_data={'salary': 17000, 'grade': 'L2'}
# )
#
#
# # SQL: DELETE FROM employees;
# Employee.objects.delete()

Теперь, когда у нас есть общая структура, мы можем продолжить и реализовать наши функции (SELECT, INSERT, UPDATE и DELETE), как описано выше. Но перед этим давайте настроим базу данных, чтобы иметь возможность протестировать нашу ORM.

Настройка базы данных

Большинство популярных ORM поддерживают множество систем управления базами данных, но здесь, для простоты, давайте просто реализуем поддержку PostgreSQL.

Следовательно, чтобы иметь возможность протестировать нашу ORM, нам понадобится база данных Postgres. Вот как будут выглядеть настройки нашей базы данных.

DB_SETTINGS = {
    'host': '127.0.0.1',
    'port': '5432',
    'database': 'ormify',
    'user': 'yank',
    'password': 'yank'
}

Для того, чтобы следовать за использованием ORM определенном выше, после создания базы данных, мы должны были бы создать таблицу employees с полями first_namelast_namesalary, и grade.

Вы можете использовать приведенный ниже сценарий, чтобы создать эту таблицу и заполнить ее некоторыми данными (после обновления DB_SETTINGS с вашими собственными учетными данными).

import psycopg2

DB_SETTINGS = {
    'host': '127.0.0.1',
    'port': '5432',
    'database': 'ormify',
    'user': 'yank',
    'password': 'yank'
}

connection = psycopg2.connect(**DB_SETTINGS)
cursor = connection.cursor()

# Create employees table
query = """
    CREATE TABLE employees (
        id SERIAL PRIMARY KEY,
        first_name varchar(255),
        last_name varchar(255),
        salary numeric(10, 2),
        grade varchar(10)
    )
"""
cursor.execute(query)

# Insert some data
query = """
    INSERT INTO employees (first_name, last_name, salary, grade)
        VALUES
            ('Renaud', 'Lemec', 13000, 'L2'),
            ('Junior', 'Racio', 16000, 'L3');
"""
cursor.execute(query)

connection.commit()

Для этого скрипта требуется пакет psycopg2. Вы можете установить его, выполнив команду:

pip install psycopg2-binary

Если у вас уже есть существующая база данных с некоторыми таблицами в ней, вы также можете создать свои собственные модели (как наша модель Employee) и протестировать их.

Хорошо, если сейчас все настроено, мы можем начать с первой функции: SELECT

Функция SELECT

Напоминаем, что мы хотим выполнять функцию SELECT следующим образом:

# SQL: SELECT salary, grade FROM employees;
employees = Employee.objects.select('salary', 'grade')

employees должен содержать список объектов Employee. Этого можно добиться, обновив код реализации ORM следующим образом:

import psycopg2


# ------------ Manager (Model objects handler) ------------ #
class BaseManager:
    database_settings = {}

    def __init__(self, model_class):
        self.model_class = model_class

    def select(self, *field_names, chunk_size=2000):
        # Build SELECT query
        fields_format = ', '.join(field_names)
        query = f"SELECT {fields_format} FROM {self.model_class.table_name}"

        # Execute query
        connection = psycopg2.connect(**self.database_settings)
        cursor = connection.cursor()
        cursor.execute(query)

        # Fetch data obtained with the previous query execution
        # and transform it into `model_class` objects.
        # The fetching is done by batches of `chunk_size` to
        # avoid to run out of memory.
        model_objects = list()
        is_fetching_completed = False
        while not is_fetching_completed:
            result = cursor.fetchmany(size=chunk_size)
            for row_values in result:
                keys, values = field_names, row_values
                row_data = dict(zip(keys, values))
                model_objects.append(self.model_class(**row_data))
            is_fetching_completed = len(result) < chunk_size

        return model_objects

    def bulk_insert(self, rows: list):
        pass

    def update(self, new_data: dict):
        pass

    def delete(self):
        pass


# ----------------------- Model ----------------------- #
class MetaModel(type):
    manager_class = BaseManager

    def _get_manager(cls):
        return cls.manager_class(model_class=cls)

    @property
    def objects(cls):
        return cls._get_manager()


class BaseModel(metaclass=MetaModel):
    table_name = ""

    def __init__(self, **row_data):
        for field_name, value in row_data.items():
            setattr(self, field_name, value)
    
    def __repr__(self):
        attrs_format = ", ".join([f'{field}={value}' for field, value in self.__dict__.items()])
        return f"<{self.__class__.__name__}: ({attrs_format})>"


# ----------------------- Setup ----------------------- #
DB_SETTINGS = {
    'host': '127.0.0.1',
    'port': '5432',
    'database': 'ormify',
    'user': 'yank',
    'password': 'yank'
}

BaseManager.database_settings = DB_SETTINGS


# ----------------------- Usage ----------------------- #
class Employee(BaseModel):
    manager_class = BaseManager
    table_name = "employees"


# SQL: SELECT salary, grade FROM employees;
employees = Employee.objects.select('salary', 'grade')  # employees: List[Employee]
print(employees)

Вы можете использовать свой любимый инструмент редактирования кода или Diffchecker, чтобы легко проверить изменения на различных этапах реализации.

Давайте протестируем этот код, чтобы увидеть, что происходит.

Если все прошло хорошо, вы должны увидеть такой вывод:

Если это так, поздравляю 😎, вы реализовали ORM с Python, который может выполнять базовые запросы SELECT к вашей базе данных. Теперь давайте добавим функции INSERT, UPDATE и DELETE.

Функции insert, update, delete

Чтобы реализовать эти функции, мы можем обновить код следующим образом:

import psycopg2


# ------------ Manager (Model objects handler) ------------ #
class BaseManager:
    connection = None

    @classmethod
    def set_connection(cls, database_settings):
        connection = psycopg2.connect(**database_settings)
        connection.autocommit = True  # https://www.psycopg.org/docs/connection.html#connection.commit
        cls.connection = connection

    @classmethod
    def _get_cursor(cls):
        return cls.connection.cursor()

    @classmethod
    def _execute_query(cls, query, params=None):
        cursor = cls._get_cursor()
        cursor.execute(query, params)

    def __init__(self, model_class):
        self.model_class = model_class

    def select(self, *field_names, chunk_size=2000):
        # Build SELECT query
        fields_format = ', '.join(field_names)
        query = f"SELECT {fields_format} FROM {self.model_class.table_name}"

        # Execute query
        cursor = self._get_cursor()
        cursor.execute(query)

        # Fetch data obtained with the previous query execution
        # and transform it into `model_class` objects.
        # The fetching is done by batches of `chunk_size` to
        # avoid to run out of memory.
        model_objects = list()
        is_fetching_completed = False
        while not is_fetching_completed:
            result = cursor.fetchmany(size=chunk_size)
            for row_values in result:
                keys, values = field_names, row_values
                row_data = dict(zip(keys, values))
                model_objects.append(self.model_class(**row_data))
            is_fetching_completed = len(result) < chunk_size

        return model_objects

    def bulk_insert(self, rows: list):
        # Build INSERT query and params:
        field_names = rows[0].keys()
        assert all(row.keys() == field_names for row in rows[1:])  # confirm that all rows have the same fields

        fields_format = ", ".join(field_names)
        values_placeholder_format = ", ".join([f'({", ".join(["%s"] * len(field_names))})'] * len(rows))  # https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries

        query = f"INSERT INTO {self.model_class.table_name} ({fields_format}) " \
                f"VALUES {values_placeholder_format}"

        params = list()
        for row in rows:
            row_values = [row[field_name] for field_name in field_names]
            params += row_values

        # Execute query
        self._execute_query(query, params)

    def update(self, new_data: dict):
        # Build UPDATE query and params
        field_names = new_data.keys()
        placeholder_format = ', '.join([f'{field_name} = %s' for field_name in field_names])
        query = f"UPDATE {self.model_class.table_name} SET {placeholder_format}"
        params = list(new_data.values())

        # Execute query
        self._execute_query(query, params)

    def delete(self):
        # Build DELETE query
        query = f"DELETE FROM {self.model_class.table_name} "

        # Execute query
        self._execute_query(query)


# ----------------------- Model ----------------------- #
class MetaModel(type):
    manager_class = BaseManager

    def _get_manager(cls):
        return cls.manager_class(model_class=cls)

    @property
    def objects(cls):
        return cls._get_manager()


class BaseModel(metaclass=MetaModel):
    table_name = ""

    def __init__(self, **row_data):
        for field_name, value in row_data.items():
            setattr(self, field_name, value)

    def __repr__(self):
        attrs_format = ", ".join([f'{field}={value}' for field, value in self.__dict__.items()])
        return f"<{self.__class__.__name__}: ({attrs_format})>\n"


# ----------------------- Setup ----------------------- #
DB_SETTINGS = {
    'host': '127.0.0.1',
    'port': '5432',
    'database': 'ormify',
    'user': 'yank',
    'password': 'yank'
}

BaseManager.set_connection(database_settings=DB_SETTINGS)


# ----------------------- Usage ----------------------- #
class Employee(BaseModel):
    manager_class = BaseManager
    table_name = "employees"


# SQL: SELECT first_name, last_name, salary, grade FROM employees;
employees = Employee.objects.select('first_name', 'last_name', 'salary', 'grade')  # employees: List[Employee]

print(f"First select result:\n {employees} \n")


# SQL: INSERT INTO employees (first_name, last_name, salary)
#  	VALUES ('Yan', 'KIKI', 10000), ('Yoweri', 'ALOH', 15000);
employees_data = [
    {"first_name": "Yan", "last_name": "KIKI", "salary": 10000},
    {"first_name": "Yoweri", "last_name": "ALOH", "salary": 15000}
]
Employee.objects.bulk_insert(rows=employees_data)

employees = Employee.objects.select('first_name', 'last_name', 'salary', 'grade')
print(f"Select result after bulk insert:\n {employees} \n")


# SQL: UPDATE employees SET salary = 17000, grade = 'L2';
Employee.objects.update(
    new_data={'salary': 17000, 'grade': 'L2'}
)

employees = Employee.objects.select('first_name', 'last_name', 'salary', 'grade')
print(f"Select result after update:\n {employees} \n")


# SQL: DELETE FROM employees;
Employee.objects.delete()

employees = Employee.objects.select('first_name', 'last_name', 'salary', 'grade')
print(f"Select result after delete:\n {employees} \n")

После запуска этого скрипта у вас должен быть такой вывод:

Yaaaay!! Мы можем гордиться 😎 😎, теперь у нас есть функциональная ORM, которая поддерживает все основные запросы.

Источник:

#PostgreSQL #Python
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу