Возобновить репликацию данных в Postgres и Node.js
Вы знаете как реплицировать данные из базы данных Postgres в приложение Node.js в режиме реального времени с помощью логической репликации. Однако, если приложение Node.js по какой-то причине выйдет из строя или остановится, репликация прекратится, и мы рискуем потерять данные, которые наша система тем временем производит через другой микросервис или приложение.
В этой статье мы расскажем, как возобновить репликацию с последней точки, где остановилось приложение Node.js, используя постоянный слот репликации в базе данных Postgres. Это гарантирует, что наше приложение не потеряет события, созданные другими микросервисами или приложениями во время простоя.
Создание слота репликации
Для возобновления репликации нам нужно создать слот репликации в базе данных Postgres. Слот репликации — это логическая сущность, которая отслеживает изменения, происходящие в базе данных, и отправляет их подписчику. Пакет postgres
автоматически создал для нас слот репликации, но он не постоянный, это TEMPORARY
слот репликации, который удаляется при отключении подписчика.
Поскольку мы хотим возобновить репликацию с последней точки, где приложение Node.js было остановлено, нам нужно создать постоянный слот репликации. Давайте создадим его в новом файле setup-replication.js
:
import pg from 'pg'
const { Client } = pg
const client = new Client({ user: 'postgres', password: 'foopsw' })
await client.connect()
await createReplicationSlotIfNotExists('foo_slot')
await client.end()
async function createReplicationSlotIfNotExists (slotName) {
const slots = await client.query('SELECT * FROM pg_replication_slots WHERE slot_name = $1', [slotName])
if (!slots.rows.length) {
const newSlot = await client.query("SELECT * FROM pg_create_logical_replication_slot($1, 'pgoutput')", [slotName])
console.log('Created replication slot', newSlot.rows[0])
} else {
console.log('Slot already exists', slots.rows[0])
}
}
Мы используем pg_replication_slots
представление, чтобы проверить, существует ли уже слот репликации с указанным именем. Если его нет, то мы создаем новый слот репликации с помощью pg_create_logical_replication_slot
функции.
Обратите внимание, что мы указали pgoutput
плагин в функции для декодирования изменений в слоте репликации. Это плагин по умолчанию для логической репликации, и он поставляется с Postgres.
Имейте в виду, что есть и другие плагины, такие как:
- Плагин
test_decoding
— это самый простой плагин, поставляемый с Postgres, позволяющий начать создавать свой собственный плагин. wal2json
, которые необходимо установить отдельно в базе данных Postgres, что позволит использовать их в функцииpg_create_logical_replication_slot
.
Обратите внимание, что каждый плагин имеет свои преимущества и недостатки, поэтому выбирайте тот, который лучше всего подходит для вашего варианта использования.
Самое большое различие, если вы попытаетесь использовать test_decoding
и pgoutput
, заключается в том, что первый не принимает имя публикации в качестве параметра, а второй принимает . Это означает, что вы можете использовать pgoutput
для фильтрации изменений, которые хотите реплицировать, в то время как test_decoding
будет реплицировать все изменения в базе данных без их фильтрации!
Теперь запустите файл setup-replication.js
, чтобы создать слот репликации!
Возобновление репликации
Мы готовы создать новый consumer-resume.js
файл, который возобновит репликацию с последней точки, где было остановлено приложение Node.js, поэтому давайте приступим к этому:
import { LogicalReplicationService, PgoutputPlugin } from 'pg-logical-replication'
const client = new LogicalReplicationService({
user: 'postgres',
password: 'foopsw'
}, { acknowledge: { auto: false } })
client.on('data', async (lsn, log) => {
if (log.tag === 'insert') {
console.log(`${lsn}) Received insert: ${log.relation.schema}.${log.relation.name} ${log.new.id}`)
} else if (log.relation) {
console.log(`${lsn}) Received log: ${log.relation.schema}.${log.relation.name} ${log.tag}`)
}
await client.acknowledge(lsn)
})
const eventDecoder = new PgoutputPlugin({
// Get a complete list of available options at:
// https://www.postgresql.org/docs/16/protocol-logical-replication.html
protoVersion: 4,
binary: true,
publicationNames: [
'foo_odd',
'foo_update_only'
]
})
console.log('Listening for changes...')
process.on('SIGINT', async () => {
console.log('Stopping client...')
await client.stop()
})
await client.subscribe(eventDecoder, 'foo_slot')
На этот раз мы используем пакет pg-logical-replication
для демонстрации возобновления репликации. Его низкоуровневый API дает нам больше контроля над процессом репликации, иначе мы не смогли бы настроить плагин на получение только тех изменений, которые нас интересуют.
Код можно объяснить следующим образом:
- Мы создаем новый экземпляр
LogicalReplicationService
и передаем ему параметры подключения. Обратите внимание, что мы устанавливаем параметрacknowledge.auto
наfalse
, чтобы вручную подтвердить изменения; в противном случае они были бы автоматически подтверждены. Установив этот параметр наfalse
, мы получаем еще больший контроль над процессом. - Мы прослушиваем событие
data
, чтобы получить изменения из слота репликации. - На этом этапе вам следует обработать
log
и применить свою бизнес-логику. В этом случае мы просто регистрируем изменения в консоли. - После обработки изменений необходимо обеспечить их с помощью метода подтверждения; в противном случае слот не продвинется.
lsn
(порядковый номер журнала) — это уникальный идентификатор каждого изменения в базе данных, который используется для идентификации изменений в слоте репликации. - Мы создаем новый экземпляр
PgoutputPlugin
и передаем его методуsubscribe
для установления соединения со слотом репликации.
Чтобы запустить приложение, запустите node consumer-resume.js
файл, и оно начнет получать изменения из слота репликации. Если мы сделали все правильно, вы можете запустить файл node producer.js
, чтобы произвести изменения в базе данных и увидеть изменения в приложении-потребителе.
Если вы остановите потребительское приложение, нажав Ctrl+C
, репликация остановится, и слот не будет двигаться вперед. Однако, если вы снова запустите consumer-resume.js
приложение, оно возобновит репликацию с последней точки, где она была остановлена!
Более того, мы видим, что вывод показывает только изменения из публикаций foo_odd
и foo_update_only
, которые мы настроили в экземпляре PgoutputPlugin
, поэтому мы увидим обновления и вставки id
только с нечетными номерами:
0/15648E0) Received insert: public.foo 18
0/15649A0) Received log: public.foo update
0/1564AF0) Received log: public.foo update
0/1564B80) Received insert: public.foo 20
0/1564C40) Received log: public.foo update
0/1564D90) Received log: public.foo update
0/1564E20) Received insert: public.foo 22
0/1564EE0) Received log: public.foo update
0/1565030) Received log: public.foo update
0/15650C0) Received insert: public.foo 24
Заключение
В этой статье мы обсудили, как возобновить репликацию с последней точки, где приложение Node.js было остановлено.
Мы создали постоянный слот репликации в базе данных Postgres и использовали пакет pg-logical-replication
для демонстрации возобновления репликации. Это гарантирует, что наше приложение не потеряет данные, созданные другими микросервисами или приложениями во время простоя.
При этом мы не изменили файл producer.js
, а это значит, что производитель может продолжать вносить изменения в базу данных без каких-либо проблем, а предыдущая настройка публикаций по-прежнему действительна: мы просто вручную настроили слот репликации и нового потребителя.
Помните, что слот репликации сохраняет изменения в базе данных до тех пор, пока слот не будет удален или изменения не будут подтверждены. Если не управлять должным образом, это может привести к высокому использованию диска, поскольку Postgres будет хранить изменения в журналах WAL неопределенно долго, вместо того чтобы удалять их.