Умный способ создания масштабируемых WebSocket в NestJS
Давным-давно я оказался в ситуации, когда мне нужно было создать масштабируемую систему, способную обрабатывать сотни одновременных подключений при не очень больших затратах и с разумным, но не мгновенным временем отклика.
Мои первые мысли? Давайте переместим все действия по созданию/редактированию/удалению в очередь и уведомим пользователей об успешности их действий или нет через WebSocket.
Но тогда у меня не было большого опыта работы с WebSockets в производстве, поэтому моим первым шагом было изучить, как это работает, с помощью учебных пособий, переполнения стека и других источников.
Итак, через некоторое время я понял суть того, как это должно работать, и начал готовить код и некоторое время возиться с инструментом нагрузочных тестов для имитации высокого трафика.
Первая проблема
В некоторых вопросах и ответах предлагалось вызвать метод подписки в экземпляре Redis подключенного клиента WebSocket.
io.sockets.on('connection', function (sockets) {
sockets.emit('message',{Hello: 'World!'});
sub.subscribe('attack-map-production');
sockets.on('disconnect', function() {
sub.unsubscribe('attack-map-production');
});
});
Но таким образом мы создаем новое соединение с Redis, поэтому использование памяти в нашем приложении и пуле соединений Redis увеличивается. (Redis допускает только 10 тыс. подключений к одному экземпляру)
Для меня это было большим «нет», потому что мне пришлось снизить использование памяти до минимума.
К счастью, сейчас во многих статьях упоминается, что не следует создавать новое соединение Redis на каждом клиенте WebSocket.
Вторая проблема
После создания большого куска бизнес-кода, когда я начал часть с веб-сокетами, у меня в голове возник вопрос — как их создавать правильно и безопасно?
Некоторые события у меня уже были в системе, и некоторые из них были готовы к дополнительной публикации через WebSockets, остальные должны были оставаться внутри системы.
Моим золотым обещанием было то, что мне не придется радикально менять код и я по-прежнему смогу отправлять только выбранные события клиентам веб-сокетов.
Поэтому сначала я создал модуль Redis pub-sub
, так как считал, что мои события, чтобы быть видимыми в других экземплярах, должны передаваться через шаблон Redis pub-sub
.
Не расстраивайтесь, глядя на модуль ниже, поскольку я объясню детали позже в сценарии использования.
export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT';
export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT';
export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS =
'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS';
@Module({
providers: [
{
provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS,
useFactory: (options: RedisEventPubSubModuleOptions) => options,
inject: [MODULE_OPTIONS_TOKEN],
},
{
provide: REDIS_PUB_CLIENT,
useFactory: async (options: RedisEventPubSubModuleOptions) => {
const client = createClient({
url: `redis://${options.host}:${options.port}`,
});
client.on('error', (err) => console.error('Redis Client Error', err));
await client.connect();
return client;
},
inject: [MODULE_OPTIONS_TOKEN],
},
{
provide: EVENT_EMITTER_TOKEN,
useFactory: (
redisPubClient: RedisClientType,
eventEmitter: EventEmitter2,
) => {
return new RedisEventEmitter(redisPubClient, eventEmitter);
},
inject: [REDIS_PUB_CLIENT, EventEmitter2],
},
{
provide: EVENT_SUBSCRIBER_TOKEN,
useFactory: (eventEmitterSub: EventEmitter2) => {
return new EventEmitter2EventSubscriber(eventEmitterSub);
},
inject: [EventEmitter2],
},
],
exports: [
REDIS_PUB_CLIENT,
EVENT_EMITTER_TOKEN,
EVENT_SUBSCRIBER_TOKEN,
REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS,
],
})
export class RedisEventPubSubModule extends ConfigurableModuleClass {
static registerEvents(eventsPublishableNames: string[]): DynamicModule {
return {
module: class {},
providers: [
{
provide: REDIS_SUB_CLIENT,
useFactory: async (
options: RedisEventPubSubModuleOptions,
eventEmitter: EventEmitter2,
) => {
const client = createClient({
url: `redis://${options.host}:${options.port}`,
});
client.on('error', (err) =>
console.error('Redis Client Error', err),
);
await client.connect();
for (const eventPublishableName of eventsPublishableNames) {
await client.subscribe(eventPublishableName, (message) => {
const normalizedMessage = JSON.parse(
message,
) as PublishableEventInterface;
delete (
normalizedMessage as Writeable<PublishableEventInterface>
).publishableEventName;
eventEmitter.emit(eventPublishableName, normalizedMessage);
});
}
return client;
},
inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2],
},
],
};
}
}
Этот модуль заботится о создании/предоставлении доступа к клиенту Pub Redis и предоставлении дополнительного метода — RegisterEvents
, который отвечает за прослушивание данных событий в Redis pub-sub
и их повторную отправку через генератор событий.
Возможно, сейчас немного туманно. Зачем повторять события? Зачем нам нужна регистрация на эти мероприятия? Что такое EVENT_EMITTER_TOKEN
и EVENT_SUBSCRIBER_TOKEN
и почему нам необходимо их экспортировать?
При реальном использовании будет более понятно, поэтому давайте создадим вариант использования — сообщения в чате. Мы хотим иметь возможность отправлять сообщения через HTTP POST
и получать их через WebSocket на внешнем интерфейсе.
Давай начнем!
Публикация событий
Вот модуль для этого
@Module({
imports: [],
controllers: [],
providers: [],
})
export class UserChatModule {}
И событие, которое этот модуль будет генерировать после получения запроса POST
export class NewMessageEvent {
constructor(public readonly message: string) {}
}
В контроллере мы должны разрешить отправку событий как для нашей системы, так и для очереди публикации Redis. Для этого мы будем использовать завернутый EventEmitter2
.
export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN';
export class RedisEventEmitter implements EventEmitterInterface {
constructor(
private redisPubClient: RedisClientType,
private eventEmitter: EventEmitter2,
) {}
async emit(eventName: string, payload: Record<any, any>): Promise<void> {
this.eventEmitter.emit(eventName, payload);
if (this.isPublishableEvent(payload)) {
await this.redisPubClient.publish(
payload.publishableEventName,
JSON.stringify(payload),
);
}
}
private isPublishableEvent(event: any): event is PublishableEventInterface {
return event.publishableEventName !== undefined;
}
}
И затем мы можем использовать его в нашем контроллере.
@Controller('messages')
export class SendMessageAction {
constructor(
// Previously eventEmitter2
@Inject(EVENT_EMITTER_TOKEN)
private readonly eventEmitter: EventEmitterInterface,
) {}
@Post()
async handle(@Body() request: SendMessageHttpRequest) {
await this.eventEmitter.emit(
NewMessageEvent.name,
new NewMessageEvent(request.content),
);
}
}
Но перед этим нам нужно улучшить наше событие с помощью PublishableEventInterface
, чтобы позволить RedisEventEmitter
перехватывать наше событие и отправлять его в очередь публикации Redis
.
export class NewMessageEvent implements PublishableEventInterface {
static publishableEventName = 'events:new-message';
publishableEventName = NewMessageEvent.publishableEventName;
constructor(public readonly message: string) {}
}
Отлично, теперь мы отправляем наши события, как раньше, но теперь, если они помечены как доступные для публикации, они попадут в очередь паба Redis.
Но теперь нам нужно сделать возможным получение этих событий через WebSocket, верно?
Получение событий
Итак, давайте взглянем на наш модуль пользовательского чата.
@Module({
imports: [
RedisEventPubSubModule.registerEvents([
NewMessageEvent.publishableEventName,
]),
],
controllers: [SendMessageAction],
providers: [],
})
export class UserChatModule {}
Как видите, мы использовали упомянутый ранее метод — RegisterEvents
.
Благодаря этому методу мы сообщили RedisEventPubSubModule
, что он должен прослушивать наше событие NewMessageEvent
в очереди публикации Redis в атрибуте PublishableEventName
.
Таким образом, если произойдет какое-либо событие NewMessageEvent, оно будет повторно отправлено как обычное событие NewMessageEvent
, но с атрибутом PublishableEventName
.
Стоит отметить, что он будет работать как с 1 экземпляром, так и с 1000 экземплярами. Таким образом, даже если мы масштабируемся до большого количества экземпляров, каждый из них получит это событие и повторно отправит это событие внутри системы.
Итак, теперь у нас есть возможности генерировать события и прослушивать их. Теперь нам нужно доставить их нашим клиентам через веб-сокеты.
Websocket Gateway
Давайте посмотрим на Websocket Gateway.
export enum WebsocketEventSubscribeList {
FETCH_EVENTS_MESSAGES = 'fetch-events-messages',
EVENTS_MESSAGES_STREAM = 'events-messages-stream',
}
@WebSocketGateway({
pingInterval: 30000,
pingTimeout: 5000,
cors: {
origin: '*',
},
})
export class MessagesWebsocketGateway {
constructor(
@Inject(EVENT_SUBSCRIBER_TOKEN)
private eventSubscriber: EventSubscriberInterface,
) {}
@SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES)
async streamMessagesData(@ConnectedSocket() client: any) {
const stream$ = this.createWebsocketStreamFromEventFactory(
client,
this.eventSubscriber,
NewMessageEvent.publishableEventName,
);
const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM;
return from(stream$).pipe(map((data) => ({ event, data })));
}
private createWebsocketStreamFromEventFactory(
client: any,
eventSubscriber: EventSubscriberInterface,
eventName: string,
): Observable<any> {
return new Observable((observer) => {
const dynamicListener = (message: PublishableEventInterface) => {
observer.next(message);
};
eventSubscriber.on(eventName, dynamicListener);
client.on('disconnect', () => {
eventSubscriber.off(eventName, dynamicListener);
});
});
}
}
Так вот есть такая штука, в конструкторе у нас есть EVENT_SUBSCRIBER_TOKEN
, тип EventSubscriberInterface
. Но что он на самом деле делает? Вот так это выглядит под капотом.
export class EventEmitter2EventSubscriber implements EventSubscriberInterface {
constructor(private eventEmitter: EventEmitter2) {}
on(name: string, listener: any): void {
this.eventEmitter.on(name, listener);
}
off(name: string, listener: any): void {
this.eventEmitter.removeListener(name, listener);
}
}
Это просто оболочка для EventEmitter2
, которую мы используем в методе createWebsocketStreamFromEventFactory
.
private createWebsocketStreamFromEventFactory(
client: any,
eventSubscriber: EventSubscriberInterface,
eventName: string,
): Observable<any> {
return new Observable((observer) => {
const dynamicListener = (message: PublishableEventInterface) => {
observer.next(message);
};
eventSubscriber.on(eventName, dynamicListener);
client.on('disconnect', () => {
eventSubscriber.off(eventName, dynamicListener);
});
});
}
}
Мы используем этот завернутый EventEmitter2
для создания динамических прослушивателей для publicName
, когда клиенты веб-сокета подключаются и удаляются при отключении.
Затем мы не делаем ничего, кроме создания потока rxjs
, чтобы поддерживать соединение с веб-сокетом и отправлять сообщения от прослушивателя через Observer.next(message);
когда появляется новое сообщение.
Как это событие дойдет до наших слушателей?
Если вы вернетесь к первому фрагменту кода, нашему подмодулю Redis pub
, вы сможете заметить это в методе RegisterEvents
.
for (const eventPublishableName of eventsPublishableNames) {
await client.subscribe(eventPublishableName, (message) => {
const normalizedMessage = JSON.parse(
message,
) as PublishableEventInterface;
delete (
normalizedMessage as Writeable<PublishableEventInterface>
).publishableEventName;
eventEmitter.emit(eventPublishableName, normalizedMessage);
});
Который в основном прослушивает события в очереди публикации, а затем повторно отправляет их через генератор событий.
Итак, подведем итоги того, что мы здесь сделали
- Мы по-прежнему используем наши события в системе, как раньше, через
EventEmitter2
, но если мы хотим опубликовать их для наших подключенных клиентов веб-сокетов, тогда все, что нам нужно сделать, это реализоватьPublishableInterface
. - Мы не создаем новые соединения
Redis
на каждом подключенном клиенте websocket. - Мы можем масштабировать нашу систему до X экземпляров, и она по-прежнему будет вести себя одинаково — каждый подключенный клиент получит копию события через websocket, независимо от того, к какому экземпляру они будут подключены.
Рабочий код и пример доступны здесь