DevGang
Авторизоваться

Планирование задач с NodeJS и Redis 

Пример push-уведомления из приложения uSTADIUM

В uSTADIUM мы используем систему планирования задач для отправки тысяч push-уведомлений. На первый взгляд необходимость в очереди задач и планировщике была неочевидна. Наш сервер будет обрабатывать уведомления по мере необходимости во время запроса. Со временем эта нагрузка начала перегружать систему. Я не был уверен, как это исправить, так что это было интересное путешествие, решающее эту проблему. В этой статье я расскажу об этом методе, о том, как мы его создали с использованием Redis, и о том, как мы масштабируем систему.

Проблема

Создание API не так уж сложно, если вы понимаете основы. Мы отправляем HTTP-запросы на сервер, он выполняет некоторую работу, а затем возвращает запрошенные данные. Просто. Но что происходит, когда этот запрос требует работы, выходящей за пределы его объема? Например, когда я упоминаю пользователя, система должна отправить push-уведомление затронутым пользователям. Обработка уведомления в течение жизненного цикла этого запроса приведет к задержке окончательного ответа. Поскольку наша система уведомлений стала более сложной, стало ясно, что нам нужно больше думать об этом.

Для обработки уведомления и последующей отправки push-уведомления требуются вызовы в базу данных и внешние API. Процесс разбивается следующим образом:

  1. Происходит действие, которое требует создания уведомления.
  2. Уведомление составляется и вставляется в базу данных.
  3. Это уведомление сопоставляется с набором пользователей, которые его получат.
  4. Мы получаем список всех устройств для пользователей, которых мы должны уведомить.
  5. Мы отправляем push-уведомление на каждое зарегистрированное у нас устройство.
  6. Мы обновляем статус отправки этого уведомления и удаляем недопустимые токены устройств.

С каждым из этих 6 шагов связан как минимум один запрос к базе данных. Этот процесс может завершиться очень быстро, когда необходимо отправить одно уведомление на устройство одного пользователя, но если это займет больше времени, запросы рискуют прерваться. Мы должны отделить эту логику, чтобы она могла быть обработана вне жизненного цикла запроса / ответа.

Очередь задач

Очереди задач управляет списком работ, которые должны быть выполнены в отдельном процессе. Одна система добавляет работу в конец очереди, в то время как другая добавляет рабочие элементы сверху. Нам нужно создать объект Task, который представляет описанную выше работу, а затем добавить его в очередь задач. Прежде чем мы могли начать, мне нужно было задать несколько фундаментальных вопросов.

1. Где будет жить очередь задач?

Мы уже использовали Redis в качестве системы кэширования, поэтому, когда я начал искать способы построения очереди, Redis был очевидным выбором. Мало того, что он был хорошо построен, чтобы справиться с этим шаблоном, но есть много онлайн ресурсов, обсуждающих, как он построен. Есть много других вариантов для этого, и если вы используете Google App Engine, вы должны заглянуть в очередь задач Google Cloud, которая предлагает больше встроенных функций.

2. Как мы узнаем, когда элемент был добавлен в очередь?

Я потратил немного времени, пытаясь выяснить. Я не хотел опрашивать Redis каждые n миллисекунд на предмет появления новых задач. Два метода обнаружились на моем радаре. Первая - это система Redis Pub / Sub. Для этого метода у меня была бы функция, которая подписывается на канал и получает сообщения по нему. Эти сообщения предупредят меня о новой задаче, которая готова к запуску. Вторым является использование простого списка Redis в качестве очереди и примитива всплывающего списка блокировки BLPOP, чтобы дождаться готовности элемента к удалению из очереди.

Во время нашей первой итерации этого дизайна мы использовали шаблон Pub / Sub, но он добавил слой сложности, который не был необходим. Кроме того, когда наша система масштабировалась, нам пришлось проделать дополнительную работу, чтобы убедиться, что сообщение не было обработано на нескольких машинах. Поэтому мы переключились на метод List и BLPOP.

3. Что мы отправляем в очередь задач?

«Ну, мы передаем задание, да…» - вот что вы могли подумать, но очереди поддерживают только добавление строк, поэтому мы не можем отправить объект. Мы хранить id из базы данных до конца или нет? Я отвечу, что этот вопрос смутил меня больше, чем следовало, главным образом потому, что я не был уверен, какой «лучший» метод был. Был ли ключ основным идентификатором в нашей базе данных или это ссылка на какой-то объект в Redis? Где мы проводим черту в работе, которая должна произойти? Я решил отправить идентификатор первичного ключа событий в очередь и позволить Задаче решить, как ее обработать. Например, если пользователь проголосует за публикацию сообщения, я отправлю идентификатор действия vote в voice_queue , как только оно будет удалено из очереди, служба будет знать, как с ним работать.

Настройка

Хорошо, я установил проблему и ответил на некоторые из моих вопросов (надеюсь, они ответят на некоторые из ваших), теперь давайте посмотрим на общую диаграмму того, как это будет работать.

Пример настройки очереди задач.

Как видно из диаграммы, на сервере работают две системы. TaskScheduler создаст новую задачу, добавит ее в базу данных, а затем отправит ID выполнения задачи до конца очереди задач. TaskManager ожидает задачи, которые будут добавлены в очередь и обрабатывает его соответствующим образом.

Пример кода

TaskScheduler.js файл является основным примером того, как мы можем добавить задачу в базу данных, а затем вставьте его в конец очереди задач. Как только он помещается в очередь, он начинает обработку, когда TaskManager начинает прослушивание.

/// TaskScheduler.js - пример того, как можно планировать задачи в очереди задач.

var redis = require('redis');
var redisClient = redis.createClient();
const TaskScheduler = async function(work){
 // Если вы используете MySQL, мы добавили бы «Задачу» в базу данных.
 let task = await Database.query("INSERT INTO Task ...");  
 let taskID = task.insertId;
 
 await redisClient.rpush("task_queue", taskID);
}

Суть TaskQueue.js - это базовый пример того, как реализовать его в NodeJS с помощью async / await.

/// TaskQueue.js будет размещен на вашем сервере и при запуске
/// чтобы начать слушать задачи. Или его можно извлечь в отдельный сервис.
var redis = require("redis");

/// TaskManager для прослушивания очереди и выполнения работы.
const TaskManager = async function(redisClient){
 while(true){
   let task;
   
   try{
     task = await redisClient.blpopAsync("task_queue", 0);
   } catch(error) {
     // Redis connect мог закрыться. Обработайте эти случаи здесь.      
     process.exit(1);
   }    
   
   try {
     await HandleTask(task);
   } catch (error) {
     // Обработка задачи не удалась. Попробуйте перезапустить его или добавить в очередь «Сбой».    
   }
 }
}

/// Функция, которая обрабатывает всю работу для этой задачи.
const HandleTask = async function(task){
  // Делай работу!
}

// Запускаем функцию TaskManager
(async function() {
 // Инициализация redis
 let redisClient = redis.createClient();
 await TaskManager(redisClient)
})()

Улучшения

Есть много областей для улучшения, поскольку приведенный мною код является лишь базовым примером. Один вопрос, который вам, возможно, придется задать, - куда вы поместите свой TaskManager. Если вы добавите его непосредственно на свой сервер, это может привести к перегрузке системы при большой нагрузке, но это зависит от того, какой тип работы выполняются вашими задачами. В нашей системе мы извлекли все это в новый микросервис с простым API для проверки его состояния.

Кроме того, в примере кода мы запускаем одну задачу за раз. Это не идеально, потому что долгосрочная задача может создать резервную копию всей очереди. Вместо этого у нас должен быть пул запущенных задач, которые добавляются и удаляются по мере необходимости. Как только пул заполнится, цикл while будет ждать нового пространства.

В заключение

Описанный метод не слишком сложен, но он отделяет бизнес-логику от логики приложения. С этим небольшим изменением мы можем начать итерацию производительности системы и создавать более надежные очереди и службы. Мы также можем воспроизвести этот метод для обработки различных длительных процессов, таких как системы рекомендаций, обработка текста и т.д.

Перевод статьи: Task Scheduling with NodeJS and Redis

#JavaScript #NodeJS #Redis
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу