Потоковый SQL в Node.js
Иногда вам нужен способ просмотреть каждую строку в таблице базы данных и выполнить с ней какие-то операции.
В целом базы данных SQL отлично справляются с обработкой огромных объемов данных. Я не думаю, что было бы преувеличением сказать, что большинству компаний, которые я видел, которые используют Hadoop, было бы лучше просто добавить индекс или немного оперативной памяти на свой сервер Postgres / MySQL. Вы можете сделать удивительную фильтрацию и агрегирование в SQL. Иногда эти мощные операции не поддерживают то, что вам нужно, иногда вам нужно обрабатывать каждую строку в node.js
Async Iterables
Представьте себе следующий сценарий:
- У нас есть база данных, содержащая большое количество идентификаторов твитов из Twitter. По крайней мере, сотни тысяч.
- Мы хотим запросить количество лайков в Твиттере.
- Мы обрабатываем это как фоновую задачу и не хотим использовать слишком много памяти.
Мы собираемся передать результаты нашего запроса, чтобы получить строки из SQL. Этот пример работает одинаково хорошо, независимо от того, используете ли вы @databases/pg
для Postgres, @databases/mysql
для MySQL или @databases/sqlite
для SQLite:
import connect, {sql} from '@databases/pg'; const db = connect(); export default async function updateTweets() { const tweets = db.queryStream( sql`SELECT id, likes FROM tweets;` ); for await (const tweet of tweets) { const likes = await getLikes(tweet.id); if (likes !== tweet.likes) { await db.query( sql` UPDATE tweets SET likes = ${likes} WHERE id = ${tweet.id}; ` ); } } }
Для этого потребуется последняя версия node или транспилятор типа babel, чтобы поддержать предложение асинхронного итератора. Он будет транслировать твиты из нашей базы данных, позволяя создать буфер из нескольких твитов, прежде чем он начнет указывать базе данных замедляться при отправке данных. Как только мы получили твит и закончили его обработку, мы можем приступить к его обработке.
Node.js Streams
Представьте, что вам нужно экспортировать всю очень большую таблицу базы данных в файл CSV для обработки в какой-либо другой системе, например, в инструмент аналитики, или для импорта в пакет учета. Вы можете подождать, пока не получите все записи, но гораздо эффективнее начать отправку данных клиенту, как только они станут доступны. Это особенно верно, если они загружают его через (относительно) медленное интернет-соединение.
Этот пример работает одинаково хорошо, независимо от того, используете ли вы @databases/pg
для Postgres, @databases/mysql
для MySQL. В настоящее время он не поддерживает SQLite.
// replace this with your db of choice import connect, {sql} from '@databases/pg'; import {Map} from 'barrage'; import stringify from 'csv-stringify'; const db = connect();export default function getTweetsCSV() { const tweets = db.queryNodeStream( sql`SELECT id, likes FROM tweets;` ); const map = new Map(tweet => [tweet.id, tweet.likes]); const stringifier = stringify(); stringifier.write(['Tweet ID', 'Likes']); tweets.pipe(map).pipe(stringifier); tweets.on('error', e => stringifier.emit('error', e); map.on('error', e => stringifier.emit('error', e); return stringifier; }
Здесь мы запрашиваем все твиты как объектный поток node.js. Затем мы направляем этот поток в Map
, который принимает каждую строку, представленную в виде объекта, и возвращает строку в виде массива. Наконец, мы направляем этот сопоставленный поток в поток csv-stringifier, который принимает массив для каждой строки и выводит файл CSV. Мы могли бы далее направить этот поток в createGzip
, если клиент поддерживает gzip.
Вывод
Оба варианта потоковой передачи обеспечивают большую гибкость.
- 99% времени, вы не хотите потоковую передачу. Это усложняет отслеживание вашего кода, и в большинстве случаев вам не нужно тянуть всю таблицу вниз для обработки. Просто отфильтруйте несколько интересующих вас строк и обработайте их за один раз.
- Если вы хотите применить операцию, которая имеет побочный эффект, к каждой строке в большой таблице, используйте асинхронный подход итератора (т.е.
.queryStream
) - Если вы хотите сериализовать каждую строку в большой таблице в строковый / двоичный формат, используйте подход потока узлов (т.е.
.queryNodeStream
).