DevGang
Авторизоваться

Умный способ создания масштабируемых WebSocket в NestJS

Давным-давно я оказался в ситуации, когда мне нужно было создать масштабируемую систему, способную обрабатывать сотни одновременных подключений при не очень больших затратах и ​​с разумным, но не мгновенным временем отклика.

Мои первые мысли? Давайте переместим все действия по созданию/редактированию/удалению в очередь и уведомим пользователей об успешности их действий или нет через WebSocket.

Но тогда у меня не было большого опыта работы с WebSockets в производстве, поэтому моим первым шагом было изучить, как это работает, с помощью учебных пособий, переполнения стека и других источников.

Итак, через некоторое время я понял суть того, как это должно работать, и начал готовить код и некоторое время возиться с инструментом нагрузочных тестов для имитации высокого трафика.

Первая проблема

В некоторых вопросах и ответах предлагалось вызвать метод подписки в экземпляре Redis подключенного клиента WebSocket.

io.sockets.on('connection', function (sockets) {
    sockets.emit('message',{Hello: 'World!'});
    sub.subscribe('attack-map-production');

    sockets.on('disconnect', function() {
            sub.unsubscribe('attack-map-production');
    });
});

Но таким образом мы создаем новое соединение с Redis, поэтому использование памяти в нашем приложении и пуле соединений Redis увеличивается. (Redis допускает только 10 тыс. подключений к одному экземпляру)

Для меня это было большим «нет», потому что мне пришлось снизить использование памяти до минимума.

К счастью, сейчас во многих статьях упоминается, что не следует создавать новое соединение Redis на каждом клиенте WebSocket.

Вторая проблема

После создания большого куска бизнес-кода, когда я начал часть с веб-сокетами, у меня в голове возник вопрос — как их создавать правильно и безопасно?

Некоторые события у меня уже были в системе, и некоторые из них были готовы к дополнительной публикации через WebSockets, остальные должны были оставаться внутри системы.

Моим золотым обещанием было то, что мне не придется радикально менять код и я по-прежнему смогу отправлять только выбранные события клиентам веб-сокетов.

Поэтому сначала я создал модуль Redis pub-sub, так как считал, что мои события, чтобы быть видимыми в других экземплярах, должны передаваться через шаблон Redis pub-sub.

Не расстраивайтесь, глядя на модуль ниже, поскольку я объясню детали позже в сценарии использования.

export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT';
export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT';
export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS =
  'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS';

@Module({
  providers: [
    {
      provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS,
      useFactory: (options: RedisEventPubSubModuleOptions) => options,
      inject: [MODULE_OPTIONS_TOKEN],
    },
    {
      provide: REDIS_PUB_CLIENT,
      useFactory: async (options: RedisEventPubSubModuleOptions) => {
        const client = createClient({
          url: `redis://${options.host}:${options.port}`,
        });
        client.on('error', (err) => console.error('Redis Client Error', err));
        await client.connect();
        return client;
      },
      inject: [MODULE_OPTIONS_TOKEN],
    },
    {
      provide: EVENT_EMITTER_TOKEN,
      useFactory: (
        redisPubClient: RedisClientType,
        eventEmitter: EventEmitter2,
      ) => {
        return new RedisEventEmitter(redisPubClient, eventEmitter);
      },
      inject: [REDIS_PUB_CLIENT, EventEmitter2],
    },
    {
      provide: EVENT_SUBSCRIBER_TOKEN,
      useFactory: (eventEmitterSub: EventEmitter2) => {
        return new EventEmitter2EventSubscriber(eventEmitterSub);
      },
      inject: [EventEmitter2],
    },
  ],
  exports: [
    REDIS_PUB_CLIENT,
    EVENT_EMITTER_TOKEN,
    EVENT_SUBSCRIBER_TOKEN,
    REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS,
  ],
})
export class RedisEventPubSubModule extends ConfigurableModuleClass {
  static registerEvents(eventsPublishableNames: string[]): DynamicModule {
    return {
      module: class {},
      providers: [
        {
          provide: REDIS_SUB_CLIENT,
          useFactory: async (
            options: RedisEventPubSubModuleOptions,
            eventEmitter: EventEmitter2,
          ) => {
            const client = createClient({
              url: `redis://${options.host}:${options.port}`,
            });
            client.on('error', (err) =>
              console.error('Redis Client Error', err),
            );
            await client.connect();
            for (const eventPublishableName of eventsPublishableNames) {
              await client.subscribe(eventPublishableName, (message) => {
                const normalizedMessage = JSON.parse(
                  message,
                ) as PublishableEventInterface;
                delete (
                  normalizedMessage as Writeable<PublishableEventInterface>
                ).publishableEventName;
                eventEmitter.emit(eventPublishableName, normalizedMessage);
              });
            }
            return client;
          },
          inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2],
        },
      ],
    };
  }
}

Этот модуль заботится о создании/предоставлении доступа к клиенту Pub Redis и предоставлении дополнительного метода — RegisterEvents, который отвечает за прослушивание данных событий в Redis pub-sub и их повторную отправку через генератор событий.

Возможно, сейчас немного туманно. Зачем повторять события? Зачем нам нужна регистрация на эти мероприятия? Что такое EVENT_EMITTER_TOKEN и EVENT_SUBSCRIBER_TOKEN и почему нам необходимо их экспортировать?

При реальном использовании будет более понятно, поэтому давайте создадим вариант использования — сообщения в чате. Мы хотим иметь возможность отправлять сообщения через HTTP POST и получать их через WebSocket на внешнем интерфейсе.

Давай начнем!

Публикация событий

Вот модуль для этого

@Module({
  imports: [],
  controllers: [],
  providers: [],
})
export class UserChatModule {}

И событие, которое этот модуль будет генерировать после получения запроса POST

export class NewMessageEvent {
  constructor(public readonly message: string) {}
}

В контроллере мы должны разрешить отправку событий как для нашей системы, так и для очереди публикации Redis. Для этого мы будем использовать завернутый EventEmitter2.

export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN';

export class RedisEventEmitter implements EventEmitterInterface {
  constructor(
    private redisPubClient: RedisClientType,
    private eventEmitter: EventEmitter2,
  ) {}

  async emit(eventName: string, payload: Record<any, any>): Promise<void> {
    this.eventEmitter.emit(eventName, payload);

    if (this.isPublishableEvent(payload)) {
      await this.redisPubClient.publish(
        payload.publishableEventName,
        JSON.stringify(payload),
      );
    }
  }

  private isPublishableEvent(event: any): event is PublishableEventInterface {
    return event.publishableEventName !== undefined;
  }
}

И затем мы можем использовать его в нашем контроллере.

@Controller('messages')
export class SendMessageAction {
  constructor(
    // Previously eventEmitter2
    @Inject(EVENT_EMITTER_TOKEN)
    private readonly eventEmitter: EventEmitterInterface,
  ) {}

  @Post()
  async handle(@Body() request: SendMessageHttpRequest) {
    await this.eventEmitter.emit(
      NewMessageEvent.name,
      new NewMessageEvent(request.content),
    );
  }
}

Но перед этим нам нужно улучшить наше событие с помощью PublishableEventInterface, чтобы позволить RedisEventEmitter перехватывать наше событие и отправлять его в очередь публикации Redis.

export class NewMessageEvent implements PublishableEventInterface {
  static publishableEventName = 'events:new-message';

  publishableEventName = NewMessageEvent.publishableEventName;

  constructor(public readonly message: string) {}
}

Отлично, теперь мы отправляем наши события, как раньше, но теперь, если они помечены как доступные для публикации, они попадут в очередь паба Redis.

Но теперь нам нужно сделать возможным получение этих событий через WebSocket, верно?

Получение событий

Итак, давайте взглянем на наш модуль пользовательского чата.

@Module({
  imports: [
    RedisEventPubSubModule.registerEvents([
      NewMessageEvent.publishableEventName,
    ]),
  ],
  controllers: [SendMessageAction],
  providers: [],
})
export class UserChatModule {}

Как видите, мы использовали упомянутый ранее метод — RegisterEvents.

Благодаря этому методу мы сообщили RedisEventPubSubModule, что он должен прослушивать наше событие NewMessageEvent в очереди публикации Redis в атрибуте PublishableEventName.

Таким образом, если произойдет какое-либо событие NewMessageEvent, оно будет повторно отправлено как обычное событие NewMessageEvent, но с атрибутом PublishableEventName.

Стоит отметить, что он будет работать как с 1 экземпляром, так и с 1000 экземплярами. Таким образом, даже если мы масштабируемся до большого количества экземпляров, каждый из них получит это событие и повторно отправит это событие внутри системы.

Итак, теперь у нас есть возможности генерировать события и прослушивать их. Теперь нам нужно доставить их нашим клиентам через веб-сокеты.

Websocket Gateway

Давайте посмотрим на Websocket Gateway.

export enum WebsocketEventSubscribeList {
  FETCH_EVENTS_MESSAGES = 'fetch-events-messages',
  EVENTS_MESSAGES_STREAM = 'events-messages-stream',
}

@WebSocketGateway({
  pingInterval: 30000,
  pingTimeout: 5000,
  cors: {
    origin: '*',
  },
})
export class MessagesWebsocketGateway {
  constructor(
    @Inject(EVENT_SUBSCRIBER_TOKEN)
    private eventSubscriber: EventSubscriberInterface,
  ) {}

  @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES)
  async streamMessagesData(@ConnectedSocket() client: any) {
    const stream$ = this.createWebsocketStreamFromEventFactory(
      client,
      this.eventSubscriber,
      NewMessageEvent.publishableEventName,
    );

    const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM;
    return from(stream$).pipe(map((data) => ({ event, data })));
  }

  private createWebsocketStreamFromEventFactory(
    client: any,
    eventSubscriber: EventSubscriberInterface,
    eventName: string,
  ): Observable<any> {
    return new Observable((observer) => {
      const dynamicListener = (message: PublishableEventInterface) => {
        observer.next(message);
      };

      eventSubscriber.on(eventName, dynamicListener);

      client.on('disconnect', () => {
        eventSubscriber.off(eventName, dynamicListener);
      });
    });
  }
}

Так вот есть такая штука, в конструкторе у нас есть EVENT_SUBSCRIBER_TOKEN, тип EventSubscriberInterface. Но что он на самом деле делает? Вот так это выглядит под капотом.

export class EventEmitter2EventSubscriber implements EventSubscriberInterface {
  constructor(private eventEmitter: EventEmitter2) {}

  on(name: string, listener: any): void {
    this.eventEmitter.on(name, listener);
  }

  off(name: string, listener: any): void {
    this.eventEmitter.removeListener(name, listener);
  }
}

Это просто оболочка для EventEmitter2, которую мы используем в методе createWebsocketStreamFromEventFactory.

 private createWebsocketStreamFromEventFactory(
    client: any,
    eventSubscriber: EventSubscriberInterface,
    eventName: string,
  ): Observable<any> {
    return new Observable((observer) => {
      const dynamicListener = (message: PublishableEventInterface) => {
        observer.next(message);
      };

      eventSubscriber.on(eventName, dynamicListener);

      client.on('disconnect', () => {
        eventSubscriber.off(eventName, dynamicListener);
      });
    });
  }
}

Мы используем этот завернутый EventEmitter2 для создания динамических прослушивателей для publicName, когда клиенты веб-сокета подключаются и удаляются при отключении.

Затем мы не делаем ничего, кроме создания потока rxjs, чтобы поддерживать соединение с веб-сокетом и отправлять сообщения от прослушивателя через Observer.next(message); когда появляется новое сообщение.

Как это событие дойдет до наших слушателей?

Если вы вернетесь к первому фрагменту кода, нашему подмодулю Redis pub, вы сможете заметить это в методе RegisterEvents.


 for (const eventPublishableName of eventsPublishableNames) {
              await client.subscribe(eventPublishableName, (message) => {
                const normalizedMessage = JSON.parse(
                  message,
                ) as PublishableEventInterface;
                delete (
                  normalizedMessage as Writeable<PublishableEventInterface>
                ).publishableEventName;
                eventEmitter.emit(eventPublishableName, normalizedMessage);
              });

Который в основном прослушивает события в очереди публикации, а затем повторно отправляет их через генератор событий.

Итак, подведем итоги того, что мы здесь сделали

  1. Мы по-прежнему используем наши события в системе, как раньше, через EventEmitter2, но если мы хотим опубликовать их для наших подключенных клиентов веб-сокетов, тогда все, что нам нужно сделать, это реализовать PublishableInterface.
  2. Мы не создаем новые соединения Redis на каждом подключенном клиенте websocket.
  3. Мы можем масштабировать нашу систему до X экземпляров, и она по-прежнему будет вести себя одинаково — каждый подключенный клиент получит копию события через websocket, независимо от того, к какому экземпляру они будут подключены.

Рабочий код и пример доступны здесь

Источник:

#JavaScript #NodeJS #NestJS
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу