DevGang
Авторизоваться

Python SDK: Погружение в рабочие процессы

В нашем предыдущем посте мы объявили, что у нас есть Python SDK в GA, и поделились небольшой историей о том, почему мы создали то, что мы сделали, и некоторыми предварительными условиями, которые вам нужны для начала работы. Теперь давайте погрузимся немного глубже, чтобы помочь вам в работе. Для целей этого поста мы собираемся разбить один файл, показанный в нашем примере hello_activity.py, и разделить его на несколько файлов, чтобы мы более точно соответствовали тому, как может выглядеть ваше приложение!

Код рабочего процесса: workflow.py

Мы хотим сначала создать рабочий процесс, потому что worker будет вызывать его. Если мы не сделаем этого первым, ваш код не будет работать, а это неинтересно.

В верхней части нашего файла, как и в любом хорошем файле Python, есть импорт, который нам нужен, чтобы все заработало. Мы также определяем класс данных Python, который будет объектом, который мы будем использовать для передачи данных в workflow. Передавая класс данных Python вместо нескольких параметров в рабочий процесс, мы можем добавлять или удалять поля из класса данных вместо того, чтобы изменять способ вызова рабочего процесса. Это обеспечивает некоторую гибкость для внесения изменений в будущем без необходимости создания версий рабочего процесса.

В этом случае ComposeGreetingInput позволяет нам передать строку greeting и строку name в рабочий процесс.

from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow

# Temporal strongly encourages using a single dataclass so 
# that you can add fields in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str

Чуть ниже ComposeGreetingInput мы добавим два новых фрагмента кода, первый — это активность.

Temporal Python SDK поставляется с декораторами, которые выполняют некоторую работу за вас. В случае действия он регистрирует этот метод как допустимое действие, которое может выполняться Worker (см. ниже). Действие принимает некоторые данные, обрабатывает их и может возвращать значения. (по сути, это точно так же, как и любой другой метод). Единственное, что мы должны отметить, это то, что если вы собираетесь делать что-то недетерминированное, например генерировать UUID, вы хотите сделать это в своих действиях или в методах, к которым обращаются ваши действия.

compose_greeting принимает объект класса данных ComposeGreetingInput, который мы определили выше, записывает некоторую информацию в терминал, а затем возвращает объединенные строки из переданного объекта. Мы могли бы так же легко отправить данные в базу данных, выполнить некоторые вычисления или даже создать вызов стороннего API в этом методе.

# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
    activity.logger.info("Running activity with parameter %s" % input)
    return f"{input.greeting}, {input.name}!"

Теперь, когда у нас есть действие, мы добавим его в класс Workflow. Этот код является основой для вашего сценария Temporal. Все, что Temporal запустит для вас в этом примере, находится внутри класса Workflow.

Мы украшаем класс workflow.defn, чтобы правильно зарегистрировать его, чтобы Temporal Worker (см. ниже) знал, что это допустимый класс Workflow.

Первый метод в этом примере класса run, но есть пара вещей, которые нужно вызвать:

Метод run украшен @workflow.run, который сообщает Worker, что это метод для запуска при запуске workflow. Его можно использовать для настройки переменных рабочего процесса, вызова одного или нескольких действий и многого другого.

Сам метод является асинхронным, потому что библиотека Python Temporal использует asyncio под капотом, а вызываемые методы должны иметь возможность запускаться асинхронно.

Метод run рабочего процесса ожидает, что name строки будет передано, когда он вызывается рабочим процессом.

Внутри метода мы вызываем метод workflow.execute_activity Temporals, который принимает:

  • Ссылка на метод activity, в данном случае compose_greeting
  • Аргументы, которые в данном случае мы используем для передачи приветствия «Hello» и имени name классу данных ComposeGreetingInput, который мы создали ранее.
  • Тайм-аут. В этом случае мы предоставляем start_to_close_timeout, который указывает Temporal Server на 10-секундный тайм-аут этой активности с момента ее запуска.
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        workflow.logger.info("Running workflow with parameter %s" % name)
        return await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )

Когда вы все закончите с этим файлом, он будет выглядеть следующим образом:

workflow.py

from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow

@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str

# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
    activity.logger.info("Running activity with parameter %s" % input)
    return f"{input.greeting}, {input.name}!"

# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        workflow.logger.info("Running workflow with parameter %s" % name)
        return await workflow.execute_activity(
            compose_greeting,
            ComposeGreetingInput("Hello", name),
            start_to_close_timeout=timedelta(seconds=10),
        )

Теперь, когда у нас есть код Workflow, мы свяжем все это вместе с Worker, который представляет собой код, который Temporal использует для асинхронного выполнения вашего кода рабочего процесса по мере появления задач в очереди. В данном случае наш пример очень прост, поэтому скрипт добавит задачу в очередь сразу после создания worker. В нашем репозитории примеров вы найдете еще много подробных примеров того, как создавать приложения, в которых разные скрипты, кроме этих двух, могут заставить рабочий процесс выполнять другие задачи асинхронно. Вы даже можете спросить рабочий процесс о нем самом или о данных, которые он хранит для вас, используя запрос.

Код выполнения: worker.py

Давайте настроим верхнюю часть нашего скрипта, чтобы у нас был весь необходимый импорт и среда. Это будет основой нашего приложения:

worker.py

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
    print("Hello, world!")

if __name__ == "__main__":
   asyncio.run(main())

Этот пример запустится в указанном выше состоянии, но все, что он сделает, это напечатает «Hello, world!» на ваш терминал в этот момент.

Чтобы воспользоваться преимуществами Temporal, нам нужен наш код для связи с Temporal Server. Подключаемся к серверу через Client. Наш код будет отправлять действия, сигналы и запросы в очередь задач Temporal внутри Temporal Server через это соединение. Когда эти задачи в очереди выполняются, они фиксируются в истории workflow, и это позволяет Temporal точно знать, какой код был запущен, какой код осталось выполнить и каково состояние вашего приложения в любой момент времени.

Создание Client

Замените функцию main() выше кодом подключения client, используя тот же путь URL-адреса локального хоста по умолчанию и порт для Temporal Server:

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")
Примечание: мы используем не http://localhost:7233, а localhost:7233!

Затем мы хотим добавить то, что мы называем worker, то есть фрагмент кода, который фактически вызывает код нашего рабочего процесса для действий в очереди.

Создание worker

В строке чуть ниже client, внутри main(), добавьте следующее:

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("Still saying ‘hello’ to you, world!")

Запуск этого кода сообщает Temporal Server, что есть worker, готовый обрабатывать задачи, передавая следующую информацию:

  1. client - Позволяет рабочему протянуть руку и сказать: «Я здесь, Temporal Server, дай мне работу!»
  2. task_queue - Сообщает Temporal Server: «Я настроен только на обработку задач из этой очереди».
  3. workflows - Список ссылок на классы Python, называемые Workflows (см. ниже), написанные специально для обработки activities действий, обрабатывающих задачи в запрошенном task_queue.
  4. activities - Массив ссылок на функции Python, которые могут обрабатывать задачи в очереди задач.

На этом этапе ваш код будет выглядеть так. Все, что делает этот код, — это подключается к Temporal Server и позволяет коду Worker запускать ваш оператор print. Мы еще не осознаем весь потенциал Temporal. Этот код напечатает «Still saying ‘hello’ to you, world» к вашему терминалу. После запуска этого кода, если вы перейдете к URL-адресу веб-интерфейса (по умолчанию 127.0.0.1:8233), вы пока ничего не увидите в своем списке рабочих процессов.

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("Still saying ‘hello’ to you, world!")

if __name__ == "__main__":
   asyncio.run(main())

Чтобы запустить наш GreetingWorkflow, мы просим Client выполнить workflow. Затем, когда мы запустим наш скрипт, Temporal Server будет отслеживать, что код сделал до сих пор, и вы сможете увидеть всю работу, которую Temporal сделал для вас, в Web UI.

В приведенном выше коде замените оператор печати следующим:

       # While the worker is running, use the client to run the workflow and
       # print out its result.
       result = await client.execute_workflow(
           GreetingWorkflow.run,
           "World",
           id="hello-activity-workflow-id",
           task_queue="hello-activity-task-queue",
       )
       print(f"Result: {result}")

Этот код позволит работнику сообщить клиенту:

  • Асинхронно вызывать execute_workflow (т. е. запускать метод run рабочего процесса)
  • Передайте ввод «World» в GreetingWorkflow.run
  • Дайте workflow идентификатор hello-activity-workflow-id
  • Поместите задачи, созданные рабочим процессом, в очередь hello-activity-task-queue (которую вы можете узнать из таких отдаленных мест, как пара строк выше, когда мы создавали Worker)

Запуск

Запустите это в своем терминале: python worker.py

Как только Worker выполняет код, он устанавливает строку, возвращенную из compose_greeting, в result, а затем распечатывает ее на терминале. Вы должны в конечном итоге увидеть Hello, World! в вашем терминале.

В веб-интерфейсе вы увидите что-то вроде этого с одной строкой рабочего процесса для каждого выполнения вашего кода:

Щелкнув workflow ID для одной из строк, вы увидите все, что произошло с вашим кодом и Temporal server, когда ваш worker выполнил код workflow:

Реальный разговор: Temporal создан для приложений, которые делают больше, чем просто печатают текст на вашем терминале, но мы хотели дать чрезвычайно простой способ показать вам разные части. В следующем сообщении блога мы покажем вам фактическое приложение, которое использует Temporal для сохранения состояния в workflow.

Если вы хотите сделать что-то также простое, но это дает вам немного больше функциональности, вы можете получить ввод из консоли с помощью этой версии worker.py:

import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting

interrupt_event = asyncio.Event()

async def main():
   # Uncomment the line below to see logging
   # logging.basicConfig(level=logging.INFO)

   # Start client
   client = await Client.connect("localhost:7233")

   # Run a worker for the workflow
   async with Worker(
       client,
       task_queue="hello-activity-task-queue",
       workflows=[GreetingWorkflow],
       activities=[compose_greeting],
   ):
       print("What's your name?")
       name = input()

       # While the worker is running, use the client to run the workflow and
       # print out its result.
       result = await client.execute_workflow(
           GreetingWorkflow.run,
           name,
           id="hello-activity-workflow-id",
           task_queue="hello-activity-task-queue",
       )
       print(f"Result: {result}")

if __name__ == "__main__":
   asyncio.run(main())

В моем случае вывод выглядел так:

Этот код также приводит к немного другому событию WorkflowExecutionStarted в истории событий рабочего процесса в веб-интерфейсе. Мы видим, что ввод «Matt» был передан в activity, а не «World».

Нажмите «Workflow Execution Started» в рабочем процессе, чтобы увидеть разницу!

Вывод

Вот и все, workflows и workers разбиты на несколько областей, над которыми вы можете работать в своем приложении. Надеемся, это поможет вам начать работу с вашим приложением Python и лучше понять, как вы можете работать с Temporal.

Это вторая статья из серии Python SDK. Вы можете найти первую статью или же продолжить знакомиться с данной темой далее пройдя по ссылкам:

#Python
Комментарии
Чтобы оставить комментарий, необходимо авторизоваться