DevGang
Авторизоваться

Простые CQRS в NodeJS с помощью Typescript

CQRS расшифровывается как разделение ответственности за командный запрос и представляет собой концепцию, которая разделяет операции чтения и записи данных. Это означает, что вы можете использовать другую модель для обновления информации, чем модель, которую вы используете для чтения информации. Таким образом, вы можете масштабировать эти модели независимо. В большинстве приложений операций чтения больше, чем записи. С помощью CQRS мы потенциально можем использовать другое хранилище данных для моделей чтения и выполнить дополнительную оптимизацию. В этом блоге мы будем использовать PostgreSQL в качестве хранилища данных для чтения и записи.

Паттерн CQRS с точки зрения архитектуры
Паттерн CQRS с точки зрения архитектуры

Модель запроса

Наиболее важной частью модели запросов является то, что мы не должны вводить никаких побочных эффектов (кроме ведения журнала и аудита) при использовании модели чтения.

Запрос к базе данных (PostgreSQL) не должен быть новаторским. Лично мне нравится иметь полную безопасность типов, чтобы мы могли легко обнаруживать ошибки во время разработки, не вводя никаких тестов, которые просто проверяют тип данных из нашего хранилища данных на соответствие типу данных, ожидаемому нашим API. Мне нравится сначала использовать схему базы данных, что означает, что мы генерируем типы из схемы базы данных и работаем с ними. Любое изменение схемы базы данных производится с помощью миграции SQL, и после этого типы typescript восстанавливаются. Другой подход заключается в использовании инструмента, ориентированного на код, такого как Typeform или Prisma. Однако, по моему опыту, такие инструменты часто создают неэффективные SQL-запросы и их менее легко расширить. В своих проектах мы используем библиотеку kysely (https://github.com/koskimas/kysely) с kysely-codegen (https://github.com/RobinBlomberg/kysely-codegen), чтобы иметь полный типобезопасный SQL builder.

Простой обработчик запросов Fastify может выглядеть примерно так:

fastify.get('/get-accounts', (_, reply) => {
    const accounts = await db
        .selectFrom('accounts')
        .select(['id', 'first_name', 'last_name'])
        .execute();
    // additional logic can be used to transform the database response
    return accounts;
});

Поскольку мы используем типобезопасный sql builder, возвращаемый тип является типобезопасным. Кроме того, если мы используем typescript во внешнем интерфейсе, мы можем усилить безопасность типов и использовать такие инструменты, как tRPC, для обеспечения полной безопасности типов, от базы данных до нашего уровня представления.

Другое решение, которое мы часто используем, — это использовать такой сервис, как Hasura (https://hasura.io), который будет генерировать конечную точку GraphQL из нашей базы данных. Эту услугу можно масштабировать самостоятельно.

Модель записи

По сравнению со стороной чтения, эту часть гораздо сложнее освоить. При изменении данных мы должны учитывать несколько факторов, которые, вероятно, будут определять, как модель будет реализована и использоваться. Некоторые из этих вещей являются:

  • Параллелизм: как мы должны относиться к одновременным записям в одну и ту же модель
  • Согласованность данных: должна ли модель всегда соответствовать инвариантам
  • Пропускная способность записи
  • Атомарность
  • Вызывает ли изменение модели какие-либо побочные эффекты (отправка электронной почты и т.д.)

Для многих из этих вещей важен выбор хранилища данных. Лично мы всегда будем выбирать базу данных SQL, совместимую с ACID, такую как PostgreSQL. Важно отметить, что некоторые приложения, которые требуют высокой пропускной способности записи, но не требуют высокой согласованности данных, лучше использовать с другими типами хранилищ данных.

Правило, которого мы всегда стараемся придерживаться, заключается в том, что каждый обработчик команд должен создавать только один побочный эффект/транзакцию.

Это гарантирует, что наша система не перейдет в несогласованное состояние.

Плохим примером обработчика команд было бы что-то вроде:

// Function that can cause an inconsistent state
async function createAccount(account: Account, group_id: string) {
  // insert user account into database
  await db.insertAccount(account);
  // add user to a group
  await db.addToGroup(account_id, group_id);
  // send email
  await emailService.sendWelcomeEmail(account.email, account.first_name);
}

Давайте улучшим функцию createAccount таким образом, чтобы у нее был только один побочный эффект:

// Function that can be slow
async function createAccount(account: Account, group_id: string) {
  await db.createTransaction(async (db) => {
    // insert user account into database
    await db.insertAccount(account);
    // add user to a group
    await db.addToGroup(account_id, group_id);
    // send email
    await emailService.sendWelcomeEmail(account.email, account.first_name);
  });
}

Это действительно будет работать и быть последовательным, однако не является масштабируемым и не расширяемым и имеет более низкую вероятность успешной фиксации. Предположим, что обработка sendWelcomeEmail занимает 10 секунд (из-за задержки в сети или медленной внешней службы), это означает, что соединение остается открытым не менее 10 секунд и не может быть использовано другими процессами. Кроме того, что, если мы хотим уведомить администраторов о создании новой учетной записи? Нам нужно было бы изменить текст транзакции и, возможно, снизить вероятность успешной фиксации.

Лучшим подходом было бы вставить учетную запись, добавить в группу и запланировать отправку электронной почты в атомарной операции и выполнить побочные эффекты асинхронно в фоновом режиме. Дополнительно мы можем записать событие в вашу базу данных, чтобы легко расширить (некритические) побочные эффекты. Оптимальной функцией было бы что-то вроде:

// Function we want
async function createAccount(account: Account, group_id: string) {
  // the magic happens here
  await execute(
    insertAccount(account),
    addToGroup(account_id, group_id),
    sendWelcomeEmail(account.email, account.first_name),
    // this is an integration event
    accountCreated(account_id)
  );
}

Вы можете задаться вопросом, какой тип возвращаемого значения у insertAccount, addToGroup, sendWelcomeEmail и accountCreated. Тип ввода функции выполнения следующий:

export interface ICommand {
  readonly sql: string;
  readonly parameters: ReadonlyArray<any>;
}

export async function execute(...commands: (ICommand | ICommand[])[]): Promise<void> {
  const queries = commands.flat();
  await db.createTransaction(async (db) => {
     for(let i = 0; i<queries.length; ++i) {
       await db.query(queries[i].sql, queries[i].parameters);
     }
  });

  // we can further improve the performance with help of
  // https://vitaly-t.github.io/pg-promise/helpers.html concat function
  // const sql = pgpInstance.helpers.concat(commands.flat().map((q) => ({ query: q.sql, values: q.parameters })));
  // await db.query(sql);
}

Примером реализации insertAccount может быть:

// client is kysely db builder
function insertAccount(account_id: string): ICommand[] {
  return [
    client
      .insertInto('accounts')
      .values({
        id: account_id,
      })
      .compile(),
  ];
}

Поскольку возвращается массив ICommand, мы можем легко добавить дополнительные изменения в базу данных для некоторых действий.

Ваш обработчик команд теперь является атомарным и гораздо более производительным, чем раньше, с более высокой вероятностью успеха (зависит только от базы данных).

Отсутствует только одна деталь: согласованность данных. Многие модели записи имеют некоторые инварианты, которые всегда должны выполняться, даже если в системе выполняется одновременная запись. В парадигме DDD они реализованы внутри совокупного корня, он же граница согласованности. Простым, но не тривиальным примером может быть выполнение заказа. Бизнес-требование заключается в том, что заказ может быть оплачен только один раз. Как только он будет оплачен, заказ будет отправлен. Давайте напишем обработчик команды fulfillOrder с текущим решением.

async function fulfillOrder(order_id: string) {
  const order = await db.selectFrom("orders")
                        .select(["state", "id"])
                        .where("id", "=", order_id)
                        .executeTakeFirst();

  // this order state is now stale data
  // if we had concurrent requets there is possibility 
  // that the state is different in the database
  if (order.state === "paid") {
    throw new Error("order already paid");
  }

  await execute(
    pay(order_id: string),
    shipOrder(order_id: string),
  );
}

function pay(order_id: string): ICommand[] {
  return [
    client
      .updateTable('orders')
      .set({ state: "paid" })
      .where("id", "=", order_id)
      .compile(),
  ];
}

На первый взгляд это выглядит нормально, однако существует небольшая вероятность того, что мы можем отправить заказ дважды. Это связано с тем, что, когда мы получаем заказ, заказ уже может быть выполнен другим запросом. Как правило, эти ошибки трудно обнаружить и их трудно протестировать. К счастью, у нас есть несколько решений для такого рода проблем. Первым решением было бы обернуть весь обработчик внутри транзакции и использовать строгий уровень изоляции транзакции (Serializable Isolation Level). Это выполнит работу, но снизит производительность, особенно когда логика между извлечением и записью данных является сложной.

Другой подход, который мы также используем, - это оптимистичный параллелизм. Оптимистичный параллелизм предполагает, что несколько транзакций могут часто завершаться, не мешая друг другу, что повышает производительность. Однако для реализации оптимистичного параллелизма нам нужно будет внести некоторые изменения. Сначала нам нужно ввести столбец версии в нашу таблицу заказов, затем нам нужно убедиться, что мутация выполняется успешно только в отношении версии, для которой проверяется инвариант, и соответствующим образом обновить версию. Наконец, мы должны убедиться, что вся транзакция не может мешать аналогичным транзакциям:

async function fulfillOrder(order_id: string) {
  const order = await db.selectFrom("orders")
                        .select(["state", "id", "version"])
                        .where("id", "=", order_id)
                        .executeTakeFirst();

  // this order state is now stale data
  // if we had concurrent requets there is possibility 
  // that the state is different in the database
  if (order.state === "paid") {
    throw new Error("order already paid");
  }

  await execute(
    pay(order_id: string, order.version),
    shipOrder(order_id: string),
  );
}

function pay(order_id: string, expected_version: number): ICommand[] {
  return [
    // since every statement in PostgreSQL is executed in order we should lock this resource
    { sql: "select pg_advisory_lock($1);", parameters: [stringToInt(order_id)] },
    // this will reject if we have some newer version
    client
        .updateTable('orders')
        .set({ id: null })
        .where("id", "=", order_id)
        .where("version", ">", expected_version)
        .compile(),
    client
      .updateTable('orders')
      .set({ state: "paid", version: expected_version + 1 })
      .where("id", "=", order_id)
      .compile(),
  ];
}

Трюк, который мы используем, заключается в обновлении строки до недопустимого ограничения БД (идентификатор является первичным ключом, поэтому не может быть нулевым), когда существует новая версия. С помощью консультативных блокировок PostgreSQL мы обеспечиваем согласованность между операторами обновления.

Резюме

У нас есть простое и не требующее фреймворка решение для достижения высокой пропускной способности записи и обеспечения высокой согласованности на основе обработчика команд. Дополнительным преимуществом использования Command является то, что тестирование также становится менее сложным. Нам нужно только смоделировать функцию execute и проверить, все ли команды заданы в качестве входных данных.

#NodeJS #TypeScript
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу