DevGang
Авторизоваться

Создание надежных Webhook-сервисов на Golang: Исчерпывающее руководство

В программной инженерии некоторые концепции, такие как веб-хуки, требуют принятия тщательных архитектурных решений перед реализацией. Веб-хуки, независимо от того, нужны они вам или нет, добавляют еще один уровень сложности в вашу систему, и вы должны обращаться с ними осторожно, чтобы обеспечить их надежность и ценность. Именно поэтому многие компании предпочитают использовать внешние сервисы для доставки веб-хуков своим клиентам.

Если вы занимаетесь разработкой программного обеспечения, то понимание того, как работают веб-хуки, и создание некоторых из них может оказаться полезным занятием. В этой статье мы рассмотрим, как с помощью Golang создать службу веб-хуков для воображаемого платежного шлюза с API в качестве поддержки.

Когда кто-то делает запрос на платежном шлюзе, чтобы запросить платеж, мы отправляем ответ, но мы также обращаемся к сервису webhook, написанному на языке Golang, чтобы отправить запрос webhook. Для связи между сервисом Golang и API, построенным на Flask, мы будем использовать канал Redis. По этому каналу будут отправляться данные и даваться указания сервису Golang отправить запрос (веб-хук) по URL, переданному в поле ввода.

В этом комплексном руководстве мы рассмотрим такие технические понятия, как очередь, веб-хук, горутины и экспоненциальный бэк-офф, приведем примеры и разберемся в построении надежного веб-хук-сервера Golang.

Как построить службу веб-хуков?

Прежде чем приступить к кодированию, необходимо обсудить наш подход к построению службы веб-хуков. Для начала давайте разберемся, что такое веб-хуки и как они работают.

Что такое веб-хуки?

Веб-хуки - это автоматические сообщения, передаваемые от одной системы к другой по определенным каналам. Они используются для уведомления других систем в реальном времени, когда что-то происходит, без необходимости постоянного сбора запросов. В ситуации с платежным шлюзом при запросе на оплату можно запросить у конечной точки идентификатор или ссылку транзакции, чтобы получить обновленный статус. Этот метод имеет свои недостатки, поскольку количество запросов может быть ограничено (дросселирование), и, например, может возникнуть таймаут сервера.

Приведенная ниже схема иллюстрирует действие запроса и возможные сценарии, такие как дросселирование и тайм-аут сервера:

Вот описание приведенной выше схемы:

  1. Клиент запрашивает статус платежа: Клиент неоднократно запрашивает статус платежа у сервера платежного шлюза.
  2. Сервер отвечает статусом платежа: Сервер отвечает с указанием текущего статуса платежа.
  3. Достигнуто ограничение дросселирования: После определенного количества запросов сервер может ограничить дальнейшие запросы клиента.
  4. Таймаут сервера: Если сервер не может ответить в течение определенного времени, это может привести к возникновению ошибки таймаута. В связи с проблемами дросселирования и возникновением таймаутов сервера, предприятия для повышения надежности и скорости работы позволяют своим клиентам не только запрашивать, но и получать веб-хуки. Если через определенное время веб-хук не получен, клиент может запросить статус. Это дает клиенту множество способов получения обновленной информации о платеже, а также помогает предприятию оставаться более надежным, прочным и удобным для разработчиков.

Ниже приведена схема, иллюстрирующая процесс использования сначала веб-хуков, а затем возврата к запросу, если веб-хуки не были доставлены:

Вот описание приведенной выше схемы:

  1. Клиент запрашивает платеж: Клиент инициирует запрос платежа на сервер.
  2. Сервер инициирует платеж: Сервер подтверждает инициацию платежа и начинает его обработку.
  3. Сервер отправляет веб-хук: Сервер пытается отправить клиенту веб-хук со статусом платежа.
  4. Веб-хук не получен: Если веб-хук не доставлен клиенту по истечении определенного времени, об этом делается соответствующая запись.
  5. Клиент производит запрос: Клиент, заметив отсутствие веб-хука, начинает запрашивать у сервера статус платежа.
  6. Сервер отвечает статусом платежа: Сервер отвечает на запрос статусом платежа.
  7. Возврат к запросу: Весь процесс демонстрирует механизм возврата, при котором клиент может прибегнуть к запросу, если веб-хук не получен.

Такой подход обеспечивает клиенту несколько способов получения обновленной информации о платеже, что делает систему более надежной, устойчивой и удобной для разработчиков.

Чаще всего веб-крючки представляют собой HTTP POST-запросы, выполняемые по URL-адресу обратного вызова, предоставленному клиентом. Когда в системе A происходит определенное событие, оно вызывает HTTP POST-запрос на определенный URL в системе B. Запрос обычно содержит информацию о событии. Затем сервер системы B обрабатывает запрос и предпринимает соответствующие действия.

Теперь, когда мы поняли, что такое веб-хуки, давайте обсудим проблемы, возникающие при проектировании службы веб-хуков.

Трудности проектирования сервиса веб-хуков

Проектирование службы веб-хуков - сложная задача, требующая тщательного учета различных факторов для обеспечения масштабируемости, надежности и безопасности. Здесь подробно рассматриваются некоторые из ключевых проблем:

  • Масштабируемость: Обработка большого количества запросов является общей проблемой для сервисов веб-хуков. Система должна быть рассчитана на горизонтальное масштабирование в зависимости от нагрузки и обеспечивать одновременную обработку большого количества веб-хуков без снижения производительности. Реализация одновременности очень важна для выполнения многочисленных веб-хуков единовременно. Для этого требуется надежная модель одновременности, способная эффективно обрабатывать синхронные запросы.
  • Надежность: Управление очередью сообщений веб-хуков очень важно для того, чтобы не потерять данные и обеспечить своевременную обработку каждого сообщения. Реализация надежной системы очередей помогает поддерживать порядок и целостность веб-хуков. Однако иногда веб-хуки не могут быть доставлены. Такое случается. Для обработки неудачной доставки веб-хуков должен быть предусмотрен надежный механизм повторных запросов. Часто это связано с применением стратегий экспоненциального отката, чтобы не перегружать принимающую систему повторными попытками в короткие сроки.
  • Безопасность: Валидация данных гарантирует, что эти данные соответствуют ожидаемому формату и не содержат вредоносного контента. Реализация строгих правил проверки позволяет предотвратить потенциальные риски безопасности. Шифрование данных, передаваемых через веб-хуки, обеспечивает дополнительный уровень безопасности, гарантируя защиту конфиденциальной информации при передаче.
  • Удобство обслуживания: Всестороннее протоколирование очень важно для отслеживания поведения службы веб-хуков, помощи в отладке и получения информации о производительности системы. Реализация подробных стратегий протоколирования помогает в мониторинге системы и может оказать неоценимую помощь в выявлении и устранении проблем. Для простоты статьи мы сосредоточимся на масштабируемости и надежности приложения. Вопросы безопасности очень обширны и могут быть рассмотрены в другой статье. Однако вы можете ознакомиться с этой статьей о том, как добавить безопасность в веб-хуки.

Вернёмся к разрабатываемому приложению, мы будем использовать Golang. Но почему? Давайте обсудим плюсы и минусы в нашем случае и почему именно Golang является надежным выбором технологии.

Почему стоит использовать Golang?

В контексте создания службы веб-хуков сильные стороны Golang - параллелизм, масштабируемость, производительность и сильная типизация - делают его надежным выбором. Наиболее интересные возможности Golang, которые помогут нам в этом, - горутины и постановка в очередь. Давайте подробнее рассмотрим эти возможности.

Горутины: Параллелизм Golang на стероидах

Горутины можно сравнить со сверхмощными потоками, но гораздо более легкими. Они позволяют функциям выполняться одновременно с другими, обеспечивая эффективную многозадачность.

Чтобы лучше понять Golang, приведем простую аналогию.

Представьте себе большой ресторан в пик посещаемости. В традиционной потоковой модели у вас может быть несколько официантов (потоков), отвечающих за прием заказов, подачу блюд и обработку платежей. Если ресторан переполнен, эти официанты могут оказаться перегруженными, что приведет к медленному обслуживанию и недовольным клиентам. Теперь рассмотрим модель горутин в сценарии ресторана.

Вместо нескольких официантов у вас есть большая команда проворных помощников (горутин), которые могут быстро и одновременно выполнять несколько задач. Каждый помощник - это как мини-официант, который может принять заказ, подать блюдо или обработать платеж. Они могут работать одновременно, эффективно используя свободное пространство и ресурсы ресторана.

Эти помощники не только многочисленны, но и легки, что позволяет им быстро переключаться с одной задачи на другую без особых затрат. Если один помощник на мгновение задерживается или ждет шеф-повара, его место может занять другой, обеспечивая бесперебойное обслуживание.

Менеджер ресторана (планировщик времени выполнения Golang) следит за этими помощниками, обеспечивая их эффективное распределение по свободным столам (ядрам процессора) и взаимодействие, не мешая друг другу. В этой аналогии ресторан представляет собой ресурсы компьютера, обеденные задачи - функции, которые должны быть выполнены, традиционные официанты - потоки, а проворные помощники - горутины. Возможность одновременной работы множества таких "помощников" обеспечивает высокую эффективность и масштабируемость обслуживания, что делает горутины мощным элементом модели параллелизма Golang.

Вот почему горутины имеют преимущества:

  • Легкость: Горутины намного легче традиционных потоков и занимают всего несколько килобайт в стеке. Это позволяет порождать тысячи и даже миллионы таких потоков одновременно, не истощая системные ресурсы.
  • Простой синтаксис: Запустить горутину можно просто добавив ключевое слово go перед вызовом функции. Например, go myFunction() запустит myFunction как горутину.
  • Эффективное планирование: Система исполнения Golang заботится о планировании работы горутин на доступных ядрах процессора, обеспечивая их оптимальное использование. Благодаря эффективному алгоритму планирования на машине с 4 ядрами могут одновременно выполняться тысячи горутин. Вот простой пример использования горутин в Golang.
func main() {
    for i := 0; i < 10000; i++ {
        go printNumber(i)
    }
}

func printNumber(number int) {
    fmt.Println(number)
}

Этот код породит 10 000 горутин для одновременной печати чисел.

Построение очередей в Golang

Постановка в очередь очень важна для управления потоком данных, особенно в сервисе веб-хуков, который должен обрабатывать большой объем запросов. Golang предлагает два основных способа постановки в очередь:

  1. Использование фрагментов: Можно реализовать простую очередь с использованием фрагментов, но такой подход не обеспечивает контроля параллелизма и может привести к возникновению ситуаций "гонки".
  2. Использование каналов: Каналы обеспечивают обмен данными между горутинами и могут выступать в роли очередей. Они обеспечивают встроенную синхронизацию, гарантируя безопасную передачу данных между горутинами. Чтобы лучше понять каналы, можно привести еще одну аналогию с рестораном.

Представьте себе кухню ресторана, где готовятся заказы. В традиционной системе без очереди официанты могут передавать заказы непосредственно поварам, что приводит к хаосу в напряженное время. Если заказов поступает слишком много, повара могут быть перегружены, что приводит к задержкам и ошибкам.

Теперь представим систему очередей с использованием каналов, аналогичную подходу Golang. В этой модели ресторан имеет хорошо организованную очередь заказов, представленную конвейерной лентой (каналом). Когда официанты принимают заказы от клиентов, они кладут их на конвейерную ленту, которая планомерно и упорядоченно перемещает заказы к поварам.

Конвейерная лента обеспечивает обработку заказов в порядке их поступления (FIFO - First In, First Out)(Первый поступивший - первый выданный). Повара могут брать с конвейера по одному заказу, готовить блюдо, а затем принимать следующий заказ. Если кухня занята, то новые заказы просто выстраиваются на конвейерной ленте, ожидая своей очереди. Такая система очередей позволяет ресторану обрабатывать большое количество заказов, не перегружая поваров.

Это также обеспечивает гибкость. Если ресторан чрезвычайно загружен, можно добавить больше поваров (горутин) для работы с заказами, поступающими с конвейерной ленты. Если же вечер не задался, то поваров может быть меньше, а конвейерная лента все равно будет обеспечивать упорядоченное выполнение заказов.

В Golang каналы действуют как конвейерная лента, обеспечивая безопасную и организованную передачу и прием значений между горутинами. Они позволяют создать конвейер, в котором данные (заказы) могут обрабатываться параллельно (несколькими поварами) без конфликтов и путаницы.

Вот схема, представляющая ресторанную аналогию для очереди с каналами в Golang:

В этой статье мы будем использовать каналы, так как для взаимодействия нам нужны горутины. Теперь, когда мы лучше понимаем, что такое веб-хуки и какова архитектура решения, разработанного в этой статье, давайте построим наш сервис веб-хуков.

Создание проекта

Чтобы упростить и облегчить работу, мы будем использовать Docker для этого проекта. У нас будет три сервиса:

  • Redis: запускает образ Redis, используемый для обмена данными между сервисом Flask API и сервисом Webhook Golang.
  • api: Это имя сервиса, написанного на Flask. Мы просто выставим конечную точку GET, которая будет генерировать случайную загрузку, отправлять ее в сервис Webhook через Redis, а затем возвращать сгенерированную загрузку. Реализовывать сложную конечную точку не нужно, поскольку основное внимание в этой статье уделяется сервису Go.
  • webhook: имя сервиса, написанного на языке Golang. Мы создадим приложение, которое будет принимать данные, поступающие по каналу Redis, а затем обрабатывать полученную загрузку и отправлять ее по URL, указанному в загрузке. Поскольку мы имеем дело с HTTP-запросами и понимаем, что другие сервисы могут быть недоступны, мы реализуем механизм повторных попыток (экспоненциальный бэк-офф) и сделаем его надежным с помощью очередей Golang. Настройка займет некоторое время, поэтому я советую просто клонировать базу проекта с помощью этой команды:
git clone --branch base https://github.com/koladev32/golang-wehook.git

Это приведет к клонированию base ветки проекта, которая уже содержит рабочий проект Flask и файл docker-compose.yaml.

Давайте быстро изучим код сервиса Flask, написанного на языке Python:

from datetime import datetime  
import json  
import os  
import random  
import uuid  
from flask import Flask  
import redis  


def get_payment():  
    return {  
        'url': os.getenv("WEBHOOK_ADDRESS", ""),  
        'webhookId': uuid.uuid4().hex,  
        'data': {  
            'id': uuid.uuid4().hex,  
            'payment': f"PY-{''.join((random.choice('abcdxyzpqr').capitalize() for i in range(5)))}",  
            'event': random.choice(["accepted", "completed", "canceled"]),  
            'created': datetime.now().strftime("%d/%m/%Y, %H:%M:%S"),  
        }  
    }  


redis_address = os.getenv("REDIS_ADDRESS", "")  
host, port = redis_address.split(":")  
port = int(port)  
# Create a connection to the Redis server  
redis_connection = redis.StrictRedis(host=host, port=port)  

app = Flask(__name__)  


@app.route('/payment')  
def payment():  
    webhook_payload_json = json.dumps(get_payment())  

    # Publish the JSON string to the "payments" channel in Redis  
    redis_connection.publish('payments', webhook_payload_json)  

    return webhook_payload_json  


if __name__ == '__main__':  
    app.run(host='0.0.0.0', port=8000)

Во-первых, мы создаем функцию get_payment, которая будет генерировать случайную загрузку. Эта функция будет возвращать случайный платеж следующей структурой:

{
    "url": "http://example.com/webhook",
    "webhookId": "52d2fc2c7f25454c8d6f471a22bdfea9",
    "data": {
        "id": "97caab9b6f924f13a94b23a960b2fff2",
        "payment": "PY-QZPCQ",
        "event": "accepted",
        "date": "13/08/2023, 00:03:46"
    }
}

После этого мы инициализируем соединение с Redis с помощью переменной окружения REDIS_ADDRESS.

...

redis_address = os.getenv("REDIS_ADDRESS", "")  
host, port = redis_address.split(":")  
port = int(port)  
# Create a connection to the Redis server  
redis_connection = redis.StrictRedis(host=host, port=port)  

...

Адрес redis_address разделен, поскольку REDIS_ADDRESS обычно выглядит следующим образом localhost:6379 или redis:6379 (если мы используем контейнеры Redis). После этого у нас есть функция-обработчик маршрута payment, которая отправляет случайную загрузку webhook_payload_json, отформатированную с помощью метода json.dumps, через канал Redis под названием payments, а затем возвращает случайную загрузку.

Это простая реализация шлюза Payment API или, проще говоря, Mock. Теперь, когда мы поняли основу проекта, давайте быстро обсудим архитектуру решения и реализацию некоторых концепций для придания ему надежности. Их недостатки мы обсудим в конце статьи.

Архитектура решения

Архитектура решения достаточно проста:

  • У нас есть API, выполняющий функции платежного шлюза. Запрос к конечной точке этого API возвращает данные, но эти данные также передаются через канал Redis под названием payments. Таким образом, все сервисы, прослушивающие этот канал, получат отправленные данные.
  • Затем у нас есть сервис веб-хуков, написанный на языке Golang. Этот сервис прослушивает payments канал Redis. Если данные получены, то загружаемая информация форматируется для отправки по указанному в ней URL. Если запрос не выполняется из-за таймаута или каких-либо других ошибок, то для повторного запроса используется механизм повторных попыток с использованием очереди канала Golang и экспоненциального обратного хода.

Давайте напишем сервис на языке Golang.

Написание сервиса на языке Golang

Логику работы сервиса веб-хуков мы напишем на языке Golang в каталоге webhook. Вот структура, которую мы получим в конце этого раздела.

webhook
├── Dockerfile          # Defines the Docker container for the project
├── go.mod              # Module dependencies file for the Go project
├── go.sum              # Contains the expected cryptographic checksums of the content of specific module versions
├── main.go             # Main entry point for the application
├── queue
│   └── worker.go       # Contains the logic for queuing and processing tasks
├── redis
│   └── redis.go        # Handles the connection and interaction with Redis
├── sender
    └── webhook.go      # Responsible for sending the webhook requests

Начнем с создания проекта Go.

go mod init .

Для создания проекта go можно использовать команду go mod init name-of-the-project. В нашем случае добавление точки . в конце команды указывает Go на использование имени каталога в качестве имени модуля.

После создания модуля установим необходимые зависимости, такие как redis.

 go get github.com/go-redis/redis/v8

Отлично! Теперь мы можем приступить к кодированию.

Добавление логики отправки веб-хуков

Начнем немного нестандартно - сначала напишем логику отправки веб-хуков. Поскольку мы используем очередь Golang и хотим упростить процесс разработки, мы начнем с добавления первой зависимости системы - функции отправки веб-хуков.

Создайте каталог sender с помощью команды mkdir sender. Затем внутри этого каталога создайте новый файл webhook.go.

mkdir sender && cd sender
touch webhook.go

Внутри вновь созданного файла добавим необходимые наименования, импорты и структуры.

package sender  

import (  
   "bytes"  
   "encoding/json"  
   "errors"  
   "io"  
   "log"  
   "net/http"  
)  

// Payload represents the structure of the data expected to be sent as a webhook  
type Payload struct {  
   Event   string  
   Date    string  
   Id      string  
   Payment string  
}

Далее создадим функцию SendWebhook, которая будет отправлять JSON POST-запрос на URL.

// SendWebhook sends a JSON POST request to the specified URL and updates the event status in the database  
func SendWebhook(data interface{}, url string, webhookId string) error {  
   // Marshal the data into JSON  
   jsonBytes, err := json.Marshal(data)  
   if err != nil {  
      return err  
   }  

   // Prepare the webhook request  
   req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))  
   if err != nil {  
      return err  
   }  
   req.Header.Set("Content-Type", "application/json")  

   // Send the webhook request  
   client := &http.Client{}  
   resp, err := client.Do(req)  
   if err != nil {  
      return err  
   }  
   defer func(Body io.ReadCloser) {  
      if err := Body.Close(); err != nil {  
         log.Println("Error closing response body:", err)  
      }  
   }(resp.Body)  

   // Determine the status based on the response code  
   status := "failed"  
   if resp.StatusCode == http.StatusOK {  
      status = "delivered"  
   }  

   log.Println(status)  

   if status == "failed" {  
      return errors.New(status)  
   }  

   return nil  
}

Давайте поясним, что здесь происходит.

  • Маршалинг данных в JSON: Переданные в функцию данные маршализируются в байтовый массив JSON. Если во время этого процесса произошла ошибка, то функция возвращает ошибку.
  jsonBytes, err := json.Marshal(data)
   if err != nil {
       return err
   }
  • Подготовка запроса Webhook: Создается новый HTTP POST-запрос, телом которого являются JSON-данные. Для заголовка "Content-Type" устанавливается значение application/json.
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
    if err != nil {
    return err
    }
    req.Header.Set("Content-Type", "application/json")
  • Отправка запроса Webhook: HTTP-клиент отправляет подготовленный запрос. Если при отправке запроса возникла ошибка, он возвращает ее. Тело ответа также откладывается для закрытия после обработки.
 client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer func(Body io.ReadCloser) {
        if err := Body.Close(); err != nil {
            log.Println("Error closing response body:", err)
        }
    }(resp.Body)
  • Определение статуса по коду ответа: Статус веб-хука определяется на основе кода ответа HTTP. Если код состояния равен 200 (OK), то статус устанавливается как " доставлено"; в противном случае устанавливается значение " не выполнено", тогда возвращается ошибка с кодом состояния.
...
status := "failed"
if resp.StatusCode == http.StatusOK {
    status = "delivered"
}
log.Println(status)
if status == "failed" {
    return errors.New(status)
}
...
  • Возврат Success: Если все прошло успешно, функция возвращает nil, указывая на то, что веб-хук был отправлен успешно.

Это логика отправки запроса. Пока ничего сложного, но мы подходим к самым интересным моментам. Добавим пакет для прослушивания канала payments Redis.

Прослушивание канала Redis

Другой важный аспект сервиса веб-хуков заключается в том, что он должен активно прослушивать канал payments Redis. Это концепция функции pub/sub в Redis. Сервис Flask публикует данные в канал, затем все сервисы, подписанные на этот канал, получают эти данные.

В корне проекта в каталоге веб-хуков создайте новый каталог с именем redis. Внутри этого каталога создайте новый файл redis.go. Этот файл будет содержать логику, которая будет подписываться и прослушивать входящие данные в канале payments, а также форматировать содержимое и отправлять его в канал очередей Golang.

package redis  

import (  
   "context"  
   "encoding/json"  
   "log"  

   "github.com/go-redis/redis/v8"  
)  

// WebhookPayload defines the structure of the data expected  
// to be received from Redis, including URL, Webhook ID, and relevant data.  
type WebhookPayload struct {  
   Url       string `json:"url"`  
   WebhookId string `json:"webhookId"`  
   Data      struct {  
      Id      string `json:"id"`  
      Payment string `json:"payment"`  
      Event   string `json:"event"`  
      Date    string `json:"created"`  
   } `json:"data"`  
}

Давайте напишем функцию Subscribe.

func Subscribe(ctx context.Context, client *redis.Client, webhookQueue chan WebhookPayload) error {  
   // Subscribe to the "webhooks" channel in Redis  
   pubSub := client.Subscribe(ctx, "payments")  

   // Ensure that the PubSub connection is closed when the function exits  
   defer func(pubSub *redis.PubSub) {  
      if err := pubSub.Close(); err != nil {  
         log.Println("Error closing PubSub:", err)  
      }  
   }(pubSub)  

   var payload WebhookPayload  

   // Infinite loop to continuously receive messages from the "webhooks" channel  
   for {  
      // Receive a message from the channel  
      msg, err := pubSub.ReceiveMessage(ctx)  
      if err != nil {  
         return err // Return the error if there's an issue receiving the message  
      }  

      // Unmarshal the JSON payload into the WebhookPayload structure  
      err = json.Unmarshal([]byte(msg.Payload), &payload)  
      if err != nil {  
         log.Println("Error unmarshalling payload:", err)  
         continue // Continue with the next message if there's an error unmarshalling  
      }  

      webhookQueue <- payload // Sending the payload to the channel  
   }  
}

В этом коде определена функция Subscribe, которая подписывается на определенный канал Redis ("payments") и постоянно прослушивает сообщения на этом канале. Когда сообщение получено, она обрабатывает его и отправляет в канал Go для дальнейшей обработки.

  • Код подписывается на канал "payments" в Redis с помощью предоставленного клиента Redis. Любые сообщения, отправленные в этот канал, будут получены данной функцией.
pubSub := client.Subscribe(ctx, "payments")
  • Затем мы обеспечиваем закрытие соединения PubSub (publish-subscribe)(публикация-подписка) с Redis при завершении функции, независимо от того, завершается ли она нормально или из-за ошибки. Это важно для очистки ресурсов.
defer func(pubSub *redis.PubSub) {
    if err := pubSub.Close(); err != nil {
        log.Println("Error closing PubSub:", err)
    }
 }(pubSub)
  • Цикл for здесь работает неограниченно, что позволяет функции продолжать прослушивать сообщения до тех пор, пока выполняется программа.
for {
    // ...
 }

Внутри цикла код ожидает получения сообщения от канала Redis. Если при получении сообщения возникает ошибка, в основном при десериализации содержимого, то функция фиксирует ошибку и мы продолжаем выполнение функции.

err = json.Unmarshal([]byte(msg.Payload), &payload)
if err != nil {
    log.Println("Error unmarshalling payload:", err)
    continue // Continue with the next message if there's an error unmarshalling
}

После получения сообщения код пытается преобразовать содержимое сообщения из JSON в структуру Go (WebhookPayload). Если в этом процессе возникает ошибка, то она записывается в журнал и продолжается переход к следующему сообщению.

Наконец, код отправляет обработанное сообщение в канал Go (webhookQueue). Этот канал будет использоваться в пакете queue для обработки загрузок.

webhookQueue <- payload // Sending the payload to the channel

Проще говоря, эта функция работает как радиоприемник, настроенный на определенную станцию (канал payments в Redis). Она постоянно прослушивает сообщения (как песни по радио) и обрабатывает их (например, регулирует качество звука), а затем передает их другой части программы (например, динамикам, воспроизводящим музыку). (Ладно! Я стараюсь изо всех сил, используя эти странные аналогии!)

Теперь, когда у нас есть метод Subscribe, давайте добавим логику постановки в очередь. Здесь мы реализуем логику повторных попыток.

Добавление логики формирования очереди

Очередь - это структура данных, работающая по принципу "Первый поступивший - первый выданный" (FIFO). Представьте себе очередь из людей, ожидающих в банке; первый человек в очереди обслуживается первым, а новые люди присоединяются к очереди в конце.

В Golang работа с очередями может осуществляться двумя основными способами:

  • Фрагменты: Фрагменты - это динамически изменяемые массивы в Go. С их помощью можно создавать простые очереди, добавляя элементы в конец и удаляя их из начала.
  • Каналы: Каналы более сложны, но предоставляют большие возможности. Они позволяют двум горутинам (параллельным функциям) взаимодействовать и синхронизировать их выполнение. Канал можно использовать как очередь, когда одна горутина посылает данные в канал (enqueue), а другая получает их из него (dequeue).

В нашем конкретном случае мы будем использовать очередь на основе каналов. Вот почему:

  • Параллельность: Каналы предназначены для обработки параллельных операций, что позволяет использовать их в сценариях, когда несколько функций должны взаимодействовать или синхронизироваться.
  • Управление емкостью: Вы можете установить емкость канала, контролируя, сколько элементов он может вместить одновременно. Это помогает управлять ресурсами и контролировать потоки.
  • Блокирующие и неблокирующие операции: Каналы могут использоваться как в блокирующем, так и в неблокирующем режимах, что позволяет контролировать поведение операций отправки и получения.

Мы будем использовать канал для отправки данных из функции Subscribe и последующей обработки этих данных в функции ProcessWebhooks, которую мы напишем далее. Использование каналов обеспечивает бесперебойную связь между различными частями нашей программы, позволяя эффективно и надежно работать с веб-хуками.

Написание логики построения очереди

В корне проекта веб-хука создайте каталог с именем queue. Внутри этого каталога добавьте файл worker.go. Этот файл будет содержать логику обработки данных, поступающих в очередь.

mkdir queue && cd queue
touch worker.go

Как обычно, начнем с импорта.

package queue  

import (  
   "context"  
   "log"  
   "time"  
   "webhook/sender"  

   redisClient "webhook/redis"  
)

А затем добавьте функцию для обработки данных веб-хуков, ProcessWebhooks.

func ProcessWebhooks(ctx context.Context, webhookQueue chan redisClient.WebhookPayload) {  
   for payload := range webhookQueue {  
      go func(p redisClient.WebhookPayload) {  
         backoffTime := time.Second  // starting backoff time  
         maxBackoffTime := time.Hour // maximum backoff time  
         retries := 0  
         maxRetries := 5  

         for {  
            err := sender.SendWebhook(p.Data, p.Url, p.WebhookId)  
            if err == nil {  
               break  
            }  
            log.Println("Error sending webhook:", err)  

            retries++  
            if retries >= maxRetries {  
               log.Println("Max retries reached. Giving up on webhook:", p.WebhookId)  
               break  
            }  

            time.Sleep(backoffTime)  

            // Double the backoff time for the next iteration, capped at the max  
            backoffTime *= 2  
            log.Println(backoffTime)  
            if backoffTime > maxBackoffTime {  
               backoffTime = maxBackoffTime  
            }  
         }  
      }(payload)  
   }  
}

Давайте разберемся в приведенном выше коде. Функция ProcessWebhooks принимает канал Go, содержащий сообщения веб-хуков, и обрабатывает их. Если отправка веб-хука не удалась, она повторяет попытку, используя экспоненциальную стратегию отката.

  • Сначала мы в цикле просматриваем список элементов в канале webhookQueue. Пока в списке будут присутствовать элементы, мы будем продолжать обрабатывать данные.
for payload := range webhookQueue {
    // processing code
}
  • Для каждой загрузки запускается новая горутина. Это позволяет одновременно обрабатывать несколько веб-хуков.
go func(p redisClient.WebhookPayload) {
  • Далее мы инициализируем переменные для управления логикой повторных попыток. Если отправка веб-хука не удалась, код будет ждать (backoffTime), прежде чем повторить попытку. После каждой неудачи это время ожидания удваивается, вплоть до максимального значения (maxBackoffTime). Процесс будет повторяться до максимального числа повторных попыток (maxRetries).
backoffTime := time.Second  // starting backoff time
maxBackoffTime := time.Hour // maximum backoff time
retries := 0
maxRetries := 5
  • В следующей части мы попытаемся отправить веб-хук с помощью функции SendWebhook. Если это удается (err == nil), то цикл прерывается, и процесс переходит к следующей загрузке.
err := sender.SendWebhook(p.Data, p.Url, p.WebhookId)
if err == nil {
    break
}
log.Println("Error sending webhook:", err)
  • Если отправка веб-хука не удалась, код регистрирует ошибку, увеличивает счетчик повторных попыток и ожидает время отката (BackoffTime), прежде чем повторить попытку. Время ожидания удваивается при каждом сбое, но ограничивается значением maxBackoffTime.
retries++
if retries >= maxRetries {
    log.Println("Max retries reached. Giving up on webhook:", p.WebhookId)
    break
}
time.Sleep(backoffTime)
backoffTime *= 2
if backoffTime > maxBackoffTime {
    backoffTime = maxBackoffTime
}

Функция ProcessWebhooks предназначена для обработки очереди сообщений веб-хуков. Она пытается отправить каждый веб-хук, а в случае неудачи повторяет попытку, используя экспоненциальную стратегию отката. Благодаря использованию гороутин она может обрабатывать несколько веб-хуков одновременно, что делает процесс более эффективным.

Проще говоря, эта функция похожа на работника почтового отделения, пытающегося доставить посылки (веб-хуки). Если доставка не удается, работник каждый раз ждет немного дольше, прежде чем повторить попытку, вплоть до определенного количества попыток. Если все попытки оказываются неудачными, работник переходит к следующей посылке.

Мы написали наиболее важные части сервиса. Давайте соберем их все вместе.

Собираем всё воедино

Настало время собрать всё написанное воедино. В файле main.go мы добавим логику создания клиента Redis для запуска соединения, создадим канал, который будет выполнять роль очереди, а затем запустим необходимые процессы.

package main  

import (  
   "context"  
   "log"  
   "os"  

   redisClient "webhook/redis"  

   "webhook/queue"  

   "github.com/go-redis/redis/v8" // Make sure to use the correct version  
)  

func main() {  
   // Create a context  
   ctx, cancel := context.WithCancel(context.Background())  
   defer cancel()  

   // Initialize the Redis client  
   client := redis.NewClient(&redis.Options{  
      Addr:     os.Getenv("REDIS_ADDRESS"), // Use an environment variable to set the address  
      Password: "",                         // No password  
      DB:       0,                          // Default DB  
   })  

   // Create a channel to act as the queue  
   webhookQueue := make(chan redisClient.WebhookPayload, 100) // Buffer size 100  

   go queue.ProcessWebhooks(ctx, webhookQueue)  

   // Subscribe to the "transactions" channel  
   err := redisClient.Subscribe(ctx, client, webhookQueue)  

   if err != nil {  
      log.Println("Error:", err)  
   }  

   select {}  

}

Поясним приведенный выше код.

  • Начнем с объявления пакета и импорта
package main

import (
    "context"
    "log"
    "os"
    redisClient "webhook/redis"
    "webhook/queue"
    "github.com/go-redis/redis/v8" // Make sure to use the correct version
)

Затем мы объявляем главную функцию, которая сначала создаст контекст.

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Контекст создается для управления сигналами отмены в различных частях программы. Это полезно для изящного завершения процессов при необходимости.

  • Затем мы инициализируем клиент Redis, создавая соединение с сервером Redis по адресу, указанному в переменной окружения.
client := redis.NewClient(&redis.Options{
    Addr:     os.Getenv("REDIS_ADDRESS"), // Use an environment variable to set the address
    Password: "",                         // No password
    DB:       0,                          // Default DB
})
  • Следующие части очень важны, поскольку сначала мы создаем канал, который будет выступать в качестве очереди для загрузки веб-хуков.
webhookQueue := make(chan redisClient.WebhookPayload, 100) // Buffer size 100
go queue.ProcessWebhooks(ctx, webhookQueue)

Размер буфера равен 100, то есть он может одновременно хранить до 100 элементов. Для обработки веб-хуков из канала webhookQueue мы запускаем горутину. Логика обработки определена в функции ProcessWebhooks.

  • Затем мы подписываемся на канал Redis под названием payments и слушаем сообщения. Когда сообщение получено, оно добавляется в канал webhookQueue для обработки.
err := redisClient.Subscribe(ctx, client, webhookQueue)
if err != nil {
    log.Println("Error:", err)
}

Затем в конце функции мы создаем бесконечный цикл, который продолжает выполнять программу.

select {}

Без этого программа завершалась бы сразу после запуска горутин, и они не успевали бы выполниться.

Проще говоря, этот код устанавливает простую систему обработки вебхуков с использованием Redis. Он инициализирует соединение с Redis, создает канал для работы в качестве очереди, запускает горутину для обработки веб-хуков и подписывается на канал Redis для получения новой загрузки веб-хуков. Затем программа переходит в бесконечный цикл, позволяя горутинам продолжать работу и обрабатывать веб-хуки по мере их поступления.

Теперь у нас есть все файлы, необходимые для работы службы веб-хуков. Мы можем выполнить докеризацию приложения и запустить контейнеры docker.

Запуск проекта

Настало время запустить проект. Добавим Dockerfile и необходимые переменные окружения. В проект webhook go добавьте следующий Dockerfile.

# Start from a Debian-based Golang official image  
FROM golang:1.21-alpine as builder  

# Set the working directory inside the container  
WORKDIR /app  

# Copy the go mod and sum files  
COPY go.mod go.sum ./  

# Download all dependencies  
RUN go mod download  

# Copy the source code from your host to your image filesystem.  
COPY . .  

# Build the Go app  
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main .  

# Use a minimal alpine image for the final stage  
FROM alpine:latest  

# Set the working directory inside the container  
WORKDIR /root/  

# Copy the binary from the builder stage  
COPY --from=builder /app/main .  

# Run the binary  
CMD ["./main"]

После написания Dockerfile мы можем запустить docker-контейнеры, но для начала необходимо иметь URL-адрес веб-хука, чтобы опробовать этот проект. Вы можете легко получить его бесплатно на сайте https://webhook.site. После этого создайте файл .env в корне проекта, где находится файл docker-compose.yaml. Затем убедитесь, что он имеет аналогичное содержание.

REDIS_ADDRESS=redis:6379  
WEBHOOK_ADDRESS=<WEBHOOK_ADDRESS>

Замените <WEBHOOK_ADDRESS> на URL веб-хука, предоставленный сайтом https://webhook.site.

Затем выполните сборку и запуск контейнера с помощью команды docker compose up -d --build.

После завершения сборки можно использовать команду docker compose logs -f для отслеживания журналов службы веб-хуков.

Нажмите http://127.0.0.1:8000/payment, чтобы начать отправку данных в службу веб-хуков через Redis. В случае возникновения ошибок журналы будут выглядеть следующим образом.

А также журналы в случае успеха.

Примечание: Чтобы настроить поведение веб-хука при получении ошибок, можно изменить статус, отправляемый при получении запроса. На панели инструментов webhook.site нажмите кнопку Edit на панели навигации. Затем измените Default status code на 504, чтобы указать на тайм-аут сервера.

Поздравляем! Мы только что создали службу веб-хуков на языке Golang. Мы изучили канал Redis, очередь с использованием каналов в Golang, а также экспоненциальный бэк-офф и горутины.

Вкратце обсудим некоторые послабления в реализованном нами решении.

Что мы можем улучшить?

Несмотря на то что реализованное нами решение является функциональным и демонстрирует такие ключевые концепции, как параллельная обработка, экспоненциальный бэк-офф и интеграция с Redis, есть несколько областей, в которых оно может быть улучшено. Ниже мы рассмотрим некоторые из них и способы их устранения:

Масштабируемость

В текущей реализации нашего сервиса веб-хуков используются Redis, очереди и горутины. Несмотря на то, что это надежная основа, она может оказаться недостаточной для обработки большого количества одновременных веб-хуков.

Горутины, несмотря на свою легкость, потребляют системные ресурсы. Подавляющее число одновременно работающих веб-хуков может исчерпать эти ресурсы, что приведет к замедлению обработки и даже отказу системы. Кроме того, Redis, несмотря на свою скорость, может стать "узким местом", если количество операций чтения/записи превысит его пропускную способность, что приведет к задержкам.

В нашей системе также существует единая точка отказа. Если сервер Redis выйдет из строя, то вся система может стать неработоспособной, что приведет к потере данных и нарушению обслуживания.

Для решения этих проблем масштабируемости можно рассмотреть следующие усовершенствования:

  1. Реализация системы распределенных очередей: Такие системы, как Kafka или RabbitMQ, могут обрабатывать большее количество сообщений и распределять нагрузку между несколькими серверами.
  2. Использование пулов рабочих процессов: Управление горутинами с помощью пулов рабочих операций позволяет контролировать использование ресурсов и предотвращать перегрузку системы.
  3. Горизонтальное масштабирование: Добавив несколько экземпляров службы веб-хуков и используя балансировщик нагрузки, мы можем равномерно распределить нагрузку и обработать большее количество запросов.

Безопасность

Безопасность - еще один важный аспект, требующий внимания. В настоящее время наш сервер Redis не использует пароль, что делает его уязвимым для несанкционированного доступа. Любой, кто может подключиться к серверу Redis, может читать или записывать данные, что создает риск потенциальной утечки информации.

Более того, если сервер Redis выйдет из строя, система может потерять отправленные данные, что приведет к их утере. Чтобы снизить этот риск, мы можем:

  1. Использовать пароль Redis: Эта простая мера позволяет предотвратить несанкционированный доступ к серверу Redis.
  2. Настроить репликацию: Репликация Redis гарантирует наличие копии данных на другом сервере, обеспечивая резервное копирование в случае выхода из строя одного сервера.
  3. Включить функцию Persistence: Регулярное сохранение данных на диске обеспечивает возможность их восстановления в случае сбоя сервера.

Если сам сервер веб-хуков не работает, то сообщения не будут обработаны, и данные могут быть потеряны. Для решения этой проблемы мы можем:

  1. Реализовать механизм повторных попыток: Надежный механизм повторных попыток с экспоненциальной обратной связью обеспечивает продолжение попыток обработки полезной нагрузки.
  2. Установить мониторинг и оповещение: Мониторинг и оповещение могут уведомлять администраторов о простоях серверов, что позволяет оперативно принимать меры.
  3. Использовать резервные серверы: Наличие резервных или отказоустойчивых серверов обеспечивает непрерывность обслуживания в случае выхода из строя одного из серверов.

Заключение

Созданная вами служба веб-хуков является прочным фундаментом, и эти усовершенствования позволяют вывести ее на новый уровень. Уделяя особое внимание масштабируемости, надежности, безопасности и удобству обслуживания, можно построить систему, которая не только отвечает текущим требованиям, но и готова к будущему росту и изменениям. Всегда учитывайте специфические потребности и ограничения вашего приложения, поскольку не все усовершенствования могут быть необходимы или применимы в вашем конкретном случае.

Если вы считаете, что эту статью можно было бы сделать лучше, не стесняйтесь оставлять комментарии ниже.

Также вы можете ознакомиться с исходным кодом проекта данной статьи на GitHub.

Источник:

#Python #Docker
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться

Присоединяйся в тусовку

В этом месте могла бы быть ваша реклама

Разместить рекламу