Cоздание с нуля простой ORM на Python
ORM (Object Relational Mapper) является инструментом, который позволяет взаимодействовать с вашей базой данных с помощью объектно-ориентированной парадигмы. Поэтому ORM обычно реализуются в виде библиотек на языках, поддерживающих объектно-ориентированное программирование.
Вот базовый пример с Django ORM:
# SQL: SELECT * FROM employees WHERE grade='L2' AND salary < 10000;
employees = Employee.objects.filter(grade='L2', salary__lt=10000)
Существует несколько ORM с открытым исходным кодом на разных языках программирования, таких как Django ORM (Python), Laravel Eloquent (PHP), Sequelize (JavaScript) и т.д.
Эти ORM реализуют очень продвинутые функции, и их исходный код может быть немного пугающим, когда вы новичок, поэтому в этой статье мы реализуем простую ORM с Python с нуля, чтобы демистифицировать процесс и дать общее представление о том, как он работает.
Определение использования
Поскольку мы создаем ORM с нуля, перед тем, как приступить к написанию кода, мы должны определить, как он будет использоваться. Это позволит нам получить хорошее представление о том, что мы пытаемся разработать.
Например, рассмотрим этот запрос:
SELECT * FROM employees WHERE salary = 10000;
С Django ORM, чтобы выполнить этот запрос, мы можем написать так:
employees = Employee.objects.filter(salary=10000)
Но это потому, что Django решил реализовать свой ORM, чтобы вести себя подобным образом. Вот несколько примеров того, как это можно было бы сделать иначе:
(1) Employee.get_objects(where={'salary': 10000})
(2) Employee.objects.select('*', condition=Condition(salary=10000))
Так что разработчик системы должен решить, как его система будет вести себя.
В нашем случае мы попытаемся разработать ORM, который позволит нам выполнять базовые операции создания, чтения, обновления и удаления с использованием, определенным ниже:
# SQL: SELECT salary, grade FROM employees;
employees = Employee.objects.select('salary', 'grade') # employees: List[Employee]
# SQL: INSERT INTO employees (first_name, last_name, salary)
# VALUES ('Yan', 'KIKI', 10000), ('Yoweri', 'ALOH', 15000);
employees_data = [
{"first_name": "Yan", "last_name": "KIKI", "salary": 10000},
{"first_name": "Yoweri", "last_name": "ALOH", "salary": 15000}
]
Employee.objects.bulk_insert(rows=employees_data)
# SQL: UPDATE employees SET salary = 17000, grade = 'L2';
Employee.objects.update(
new_data={'salary': 17000, 'grade': 'L2'}
)
# SQL: DELETE FROM employees;
Employee.objects.delete()
Поняв, как этого можно достичь, вы сможете добавить больше функций и вариантов использования.
Исполнение
Состав
Первым шагом нашей реализации ORM будет реализация структуры, которая позволит нам получить ожидаемое использование. Этого можно добиться с помощью следующего кода:
# ------------ Manager (Model objects handler) ------------ #
class BaseManager:
def __init__(self, model_class):
self.model_class = model_class
def select(self, *field_names):
pass
def bulk_insert(self, rows: list):
pass
def update(self, new_data: dict):
pass
def delete(self):
pass
# ----------------------- Model ----------------------- #
class MetaModel(type):
manager_class = BaseManager
def _get_manager(cls):
return cls.manager_class(model_class=cls)
@property
def objects(cls):
return cls._get_manager()
class BaseModel(metaclass=MetaModel):
table_name = ""
# ----------------------- Usage ----------------------- #
class Employee(BaseModel):
manager_class = BaseManager
table_name = "employees"
# # SQL: SELECT salary, grade FROM employees;
# employees = Employee.objects.select('salary', 'grade') # employees: List[Employee]
#
#
# # SQL: INSERT INTO employees (first_name, last_name, salary)
# # VALUES ('Yan', 'KIKI', 10000), ('Yoweri', 'ALOH', 15000);
# employees_data = [
# {"first_name": "Yan", "last_name": "KIKI", "salary": 10000},
# {"first_name": "Yoweri", "last_name": "ALOH", "salary": 15000}
# ]
# Employee.objects.bulk_insert(rows=employees_data)
#
#
# # SQL: UPDATE employees SET salary = 17000, grade = 'L2';
# Employee.objects.update(
# new_data={'salary': 17000, 'grade': 'L2'}
# )
#
#
# # SQL: DELETE FROM employees;
# Employee.objects.delete()
Теперь, когда у нас есть общая структура, мы можем продолжить и реализовать наши функции (SELECT, INSERT, UPDATE и DELETE), как описано выше. Но перед этим давайте настроим базу данных, чтобы иметь возможность протестировать нашу ORM.
Настройка базы данных
Большинство популярных ORM поддерживают множество систем управления базами данных, но здесь, для простоты, давайте просто реализуем поддержку PostgreSQL.
Следовательно, чтобы иметь возможность протестировать нашу ORM, нам понадобится база данных Postgres. Вот как будут выглядеть настройки нашей базы данных.
DB_SETTINGS = {
'host': '127.0.0.1',
'port': '5432',
'database': 'ormify',
'user': 'yank',
'password': 'yank'
}
Для того, чтобы следовать за использованием ORM определенном выше, после создания базы данных, мы должны были бы создать таблицу employees
с полями first_name
, last_name
, salary
, и grade
.
Вы можете использовать приведенный ниже сценарий, чтобы создать эту таблицу и заполнить ее некоторыми данными (после обновления DB_SETTINGS
с вашими собственными учетными данными).
import psycopg2
DB_SETTINGS = {
'host': '127.0.0.1',
'port': '5432',
'database': 'ormify',
'user': 'yank',
'password': 'yank'
}
connection = psycopg2.connect(**DB_SETTINGS)
cursor = connection.cursor()
# Create employees table
query = """
CREATE TABLE employees (
id SERIAL PRIMARY KEY,
first_name varchar(255),
last_name varchar(255),
salary numeric(10, 2),
grade varchar(10)
)
"""
cursor.execute(query)
# Insert some data
query = """
INSERT INTO employees (first_name, last_name, salary, grade)
VALUES
('Renaud', 'Lemec', 13000, 'L2'),
('Junior', 'Racio', 16000, 'L3');
"""
cursor.execute(query)
connection.commit()
Для этого скрипта требуется пакет psycopg2. Вы можете установить его, выполнив команду:
pip install psycopg2-binary
Если у вас уже есть существующая база данных с некоторыми таблицами в ней, вы также можете создать свои собственные модели (как наша модель Employee
) и протестировать их.
Хорошо, если сейчас все настроено, мы можем начать с первой функции: SELECT
Функция SELECT
Напоминаем, что мы хотим выполнять функцию SELECT следующим образом:
# SQL: SELECT salary, grade FROM employees;
employees = Employee.objects.select('salary', 'grade')
employees
должен содержать список объектов Employee
. Этого можно добиться, обновив код реализации ORM следующим образом:
import psycopg2
# ------------ Manager (Model objects handler) ------------ #
class BaseManager:
database_settings = {}
def __init__(self, model_class):
self.model_class = model_class
def select(self, *field_names, chunk_size=2000):
# Build SELECT query
fields_format = ', '.join(field_names)
query = f"SELECT {fields_format} FROM {self.model_class.table_name}"
# Execute query
connection = psycopg2.connect(**self.database_settings)
cursor = connection.cursor()
cursor.execute(query)
# Fetch data obtained with the previous query execution
# and transform it into `model_class` objects.
# The fetching is done by batches of `chunk_size` to
# avoid to run out of memory.
model_objects = list()
is_fetching_completed = False
while not is_fetching_completed:
result = cursor.fetchmany(size=chunk_size)
for row_values in result:
keys, values = field_names, row_values
row_data = dict(zip(keys, values))
model_objects.append(self.model_class(**row_data))
is_fetching_completed = len(result) < chunk_size
return model_objects
def bulk_insert(self, rows: list):
pass
def update(self, new_data: dict):
pass
def delete(self):
pass
# ----------------------- Model ----------------------- #
class MetaModel(type):
manager_class = BaseManager
def _get_manager(cls):
return cls.manager_class(model_class=cls)
@property
def objects(cls):
return cls._get_manager()
class BaseModel(metaclass=MetaModel):
table_name = ""
def __init__(self, **row_data):
for field_name, value in row_data.items():
setattr(self, field_name, value)
def __repr__(self):
attrs_format = ", ".join([f'{field}={value}' for field, value in self.__dict__.items()])
return f"<{self.__class__.__name__}: ({attrs_format})>"
# ----------------------- Setup ----------------------- #
DB_SETTINGS = {
'host': '127.0.0.1',
'port': '5432',
'database': 'ormify',
'user': 'yank',
'password': 'yank'
}
BaseManager.database_settings = DB_SETTINGS
# ----------------------- Usage ----------------------- #
class Employee(BaseModel):
manager_class = BaseManager
table_name = "employees"
# SQL: SELECT salary, grade FROM employees;
employees = Employee.objects.select('salary', 'grade') # employees: List[Employee]
print(employees)
Вы можете использовать свой любимый инструмент редактирования кода или Diffchecker, чтобы легко проверить изменения на различных этапах реализации.
Давайте протестируем этот код, чтобы увидеть, что происходит.
Если все прошло хорошо, вы должны увидеть такой вывод:
Если это так, поздравляю 😎, вы реализовали ORM с Python, который может выполнять базовые запросы SELECT к вашей базе данных. Теперь давайте добавим функции INSERT, UPDATE и DELETE.
Функции insert, update, delete
Чтобы реализовать эти функции, мы можем обновить код следующим образом:
import psycopg2
# ------------ Manager (Model objects handler) ------------ #
class BaseManager:
connection = None
@classmethod
def set_connection(cls, database_settings):
connection = psycopg2.connect(**database_settings)
connection.autocommit = True # https://www.psycopg.org/docs/connection.html#connection.commit
cls.connection = connection
@classmethod
def _get_cursor(cls):
return cls.connection.cursor()
@classmethod
def _execute_query(cls, query, params=None):
cursor = cls._get_cursor()
cursor.execute(query, params)
def __init__(self, model_class):
self.model_class = model_class
def select(self, *field_names, chunk_size=2000):
# Build SELECT query
fields_format = ', '.join(field_names)
query = f"SELECT {fields_format} FROM {self.model_class.table_name}"
# Execute query
cursor = self._get_cursor()
cursor.execute(query)
# Fetch data obtained with the previous query execution
# and transform it into `model_class` objects.
# The fetching is done by batches of `chunk_size` to
# avoid to run out of memory.
model_objects = list()
is_fetching_completed = False
while not is_fetching_completed:
result = cursor.fetchmany(size=chunk_size)
for row_values in result:
keys, values = field_names, row_values
row_data = dict(zip(keys, values))
model_objects.append(self.model_class(**row_data))
is_fetching_completed = len(result) < chunk_size
return model_objects
def bulk_insert(self, rows: list):
# Build INSERT query and params:
field_names = rows[0].keys()
assert all(row.keys() == field_names for row in rows[1:]) # confirm that all rows have the same fields
fields_format = ", ".join(field_names)
values_placeholder_format = ", ".join([f'({", ".join(["%s"] * len(field_names))})'] * len(rows)) # https://www.psycopg.org/docs/usage.html#passing-parameters-to-sql-queries
query = f"INSERT INTO {self.model_class.table_name} ({fields_format}) " \
f"VALUES {values_placeholder_format}"
params = list()
for row in rows:
row_values = [row[field_name] for field_name in field_names]
params += row_values
# Execute query
self._execute_query(query, params)
def update(self, new_data: dict):
# Build UPDATE query and params
field_names = new_data.keys()
placeholder_format = ', '.join([f'{field_name} = %s' for field_name in field_names])
query = f"UPDATE {self.model_class.table_name} SET {placeholder_format}"
params = list(new_data.values())
# Execute query
self._execute_query(query, params)
def delete(self):
# Build DELETE query
query = f"DELETE FROM {self.model_class.table_name} "
# Execute query
self._execute_query(query)
# ----------------------- Model ----------------------- #
class MetaModel(type):
manager_class = BaseManager
def _get_manager(cls):
return cls.manager_class(model_class=cls)
@property
def objects(cls):
return cls._get_manager()
class BaseModel(metaclass=MetaModel):
table_name = ""
def __init__(self, **row_data):
for field_name, value in row_data.items():
setattr(self, field_name, value)
def __repr__(self):
attrs_format = ", ".join([f'{field}={value}' for field, value in self.__dict__.items()])
return f"<{self.__class__.__name__}: ({attrs_format})>\n"
# ----------------------- Setup ----------------------- #
DB_SETTINGS = {
'host': '127.0.0.1',
'port': '5432',
'database': 'ormify',
'user': 'yank',
'password': 'yank'
}
BaseManager.set_connection(database_settings=DB_SETTINGS)
# ----------------------- Usage ----------------------- #
class Employee(BaseModel):
manager_class = BaseManager
table_name = "employees"
# SQL: SELECT first_name, last_name, salary, grade FROM employees;
employees = Employee.objects.select('first_name', 'last_name', 'salary', 'grade') # employees: List[Employee]
print(f"First select result:\n {employees} \n")
# SQL: INSERT INTO employees (first_name, last_name, salary)
# VALUES ('Yan', 'KIKI', 10000), ('Yoweri', 'ALOH', 15000);
employees_data = [
{"first_name": "Yan", "last_name": "KIKI", "salary": 10000},
{"first_name": "Yoweri", "last_name": "ALOH", "salary": 15000}
]
Employee.objects.bulk_insert(rows=employees_data)
employees = Employee.objects.select('first_name', 'last_name', 'salary', 'grade')
print(f"Select result after bulk insert:\n {employees} \n")
# SQL: UPDATE employees SET salary = 17000, grade = 'L2';
Employee.objects.update(
new_data={'salary': 17000, 'grade': 'L2'}
)
employees = Employee.objects.select('first_name', 'last_name', 'salary', 'grade')
print(f"Select result after update:\n {employees} \n")
# SQL: DELETE FROM employees;
Employee.objects.delete()
employees = Employee.objects.select('first_name', 'last_name', 'salary', 'grade')
print(f"Select result after delete:\n {employees} \n")
После запуска этого скрипта у вас должен быть такой вывод:
Yaaaay!! Мы можем гордиться 😎 😎, теперь у нас есть функциональная ORM, которая поддерживает все основные запросы.