Python SDK: Погружение в рабочие процессы
В нашем предыдущем посте мы объявили, что у нас есть Python SDK в GA, и поделились небольшой историей о том, почему мы создали то, что мы сделали, и некоторыми предварительными условиями, которые вам нужны для начала работы. Теперь давайте погрузимся немного глубже, чтобы помочь вам в работе. Для целей этого поста мы собираемся разбить один файл, показанный в нашем примере hello_activity.py, и разделить его на несколько файлов, чтобы мы более точно соответствовали тому, как может выглядеть ваше приложение!
Код рабочего процесса: workflow.py
Мы хотим сначала создать рабочий процесс, потому что worker будет вызывать его. Если мы не сделаем этого первым, ваш код не будет работать, а это неинтересно.
В верхней части нашего файла, как и в любом хорошем файле Python, есть импорт, который нам нужен, чтобы все заработало. Мы также определяем класс данных Python, который будет объектом, который мы будем использовать для передачи данных в workflow. Передавая класс данных Python вместо нескольких параметров в рабочий процесс, мы можем добавлять или удалять поля из класса данных вместо того, чтобы изменять способ вызова рабочего процесса. Это обеспечивает некоторую гибкость для внесения изменений в будущем без необходимости создания версий рабочего процесса.
В этом случае ComposeGreetingInput
позволяет нам передать строку greeting
и строку name
в рабочий процесс.
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
# Temporal strongly encourages using a single dataclass so
# that you can add fields in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
Чуть ниже ComposeGreetingInput
мы добавим два новых фрагмента кода, первый — это активность.
Temporal Python SDK поставляется с декораторами, которые выполняют некоторую работу за вас. В случае действия он регистрирует этот метод как допустимое действие, которое может выполняться Worker (см. ниже). Действие принимает некоторые данные, обрабатывает их и может возвращать значения. (по сути, это точно так же, как и любой другой метод). Единственное, что мы должны отметить, это то, что если вы собираетесь делать что-то недетерминированное, например генерировать UUID, вы хотите сделать это в своих действиях или в методах, к которым обращаются ваши действия.
compose_greeting
принимает объект класса данных ComposeGreetingInput
, который мы определили выше, записывает некоторую информацию в терминал, а затем возвращает объединенные строки из переданного объекта. Мы могли бы так же легко отправить данные в базу данных, выполнить некоторые вычисления или даже создать вызов стороннего API в этом методе.
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
Теперь, когда у нас есть действие, мы добавим его в класс Workflow. Этот код является основой для вашего сценария Temporal. Все, что Temporal запустит для вас в этом примере, находится внутри класса Workflow.
Мы украшаем класс workflow.defn
, чтобы правильно зарегистрировать его, чтобы Temporal Worker (см. ниже) знал, что это допустимый класс Workflow.
Первый метод в этом примере класса run
, но есть пара вещей, которые нужно вызвать:
Метод run
украшен @workflow.run
, который сообщает Worker, что это метод для запуска при запуске workflow. Его можно использовать для настройки переменных рабочего процесса, вызова одного или нескольких действий и многого другого.
Сам метод является асинхронным, потому что библиотека Python Temporal использует asyncio под капотом, а вызываемые методы должны иметь возможность запускаться асинхронно.
Метод run
рабочего процесса ожидает, что name
строки будет передано, когда он вызывается рабочим процессом.
Внутри метода мы вызываем метод workflow.execute_activity
Temporals, который принимает:
- Ссылка на метод activity, в данном случае
compose_greeting
- Аргументы, которые в данном случае мы используем для передачи приветствия «Hello» и имени
name
классу данныхComposeGreetingInput
, который мы создали ранее. - Тайм-аут. В этом случае мы предоставляем
start_to_close_timeout
, который указывает Temporal Server на 10-секундный тайм-аут этой активности с момента ее запуска.
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Когда вы все закончите с этим файлом, он будет выглядеть следующим образом:
workflow.py
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"
# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Теперь, когда у нас есть код Workflow, мы свяжем все это вместе с Worker, который представляет собой код, который Temporal использует для асинхронного выполнения вашего кода рабочего процесса по мере появления задач в очереди. В данном случае наш пример очень прост, поэтому скрипт добавит задачу в очередь сразу после создания worker. В нашем репозитории примеров вы найдете еще много подробных примеров того, как создавать приложения, в которых разные скрипты, кроме этих двух, могут заставить рабочий процесс выполнять другие задачи асинхронно. Вы даже можете спросить рабочий процесс о нем самом или о данных, которые он хранит для вас, используя запрос.
Код выполнения: worker.py
Давайте настроим верхнюю часть нашего скрипта, чтобы у нас был весь необходимый импорт и среда. Это будет основой нашего приложения:
worker.py
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
print("Hello, world!")
if __name__ == "__main__":
asyncio.run(main())
Этот пример запустится в указанном выше состоянии, но все, что он сделает, это напечатает «Hello, world!» на ваш терминал в этот момент.
Чтобы воспользоваться преимуществами Temporal, нам нужен наш код для связи с Temporal Server. Подключаемся к серверу через Client. Наш код будет отправлять действия, сигналы и запросы в очередь задач Temporal внутри Temporal Server через это соединение. Когда эти задачи в очереди выполняются, они фиксируются в истории workflow, и это позволяет Temporal точно знать, какой код был запущен, какой код осталось выполнить и каково состояние вашего приложения в любой момент времени.
Создание Client
Замените функцию main()
выше кодом подключения client, используя тот же путь URL-адреса локального хоста по умолчанию и порт для Temporal Server:
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
Примечание: мы используем неhttp://localhost:7233
, аlocalhost:7233
!
Затем мы хотим добавить то, что мы называем worker, то есть фрагмент кода, который фактически вызывает код нашего рабочего процесса для действий в очереди.
Создание worker
В строке чуть ниже client
, внутри main()
, добавьте следующее:
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("Still saying ‘hello’ to you, world!")
Запуск этого кода сообщает Temporal Server, что есть worker
, готовый обрабатывать задачи, передавая следующую информацию:
client
- Позволяет рабочему протянуть руку и сказать: «Я здесь, Temporal Server, дай мне работу!»task_queue
- Сообщает Temporal Server: «Я настроен только на обработку задач из этой очереди».workflows
- Список ссылок на классы Python, называемыеWorkflows
(см. ниже), написанные специально для обработкиactivities
действий, обрабатывающих задачи в запрошенномtask_queue
.activities
- Массив ссылок на функции Python, которые могут обрабатывать задачи в очереди задач.
На этом этапе ваш код будет выглядеть так. Все, что делает этот код, — это подключается к Temporal Server и позволяет коду Worker запускать ваш оператор print
. Мы еще не осознаем весь потенциал Temporal. Этот код напечатает «Still saying ‘hello’ to you, world
» к вашему терминалу. После запуска этого кода, если вы перейдете к URL-адресу веб-интерфейса (по умолчанию 127.0.0.1:8233
), вы пока ничего не увидите в своем списке рабочих процессов.
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("Still saying ‘hello’ to you, world!")
if __name__ == "__main__":
asyncio.run(main())
Чтобы запустить наш GreetingWorkflow
, мы просим Client выполнить workflow. Затем, когда мы запустим наш скрипт, Temporal Server будет отслеживать, что код сделал до сих пор, и вы сможете увидеть всю работу, которую Temporal сделал для вас, в Web UI.
В приведенном выше коде замените оператор печати следующим:
# While the worker is running, use the client to run the workflow and
# print out its result.
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
Этот код позволит работнику сообщить клиенту:
- Асинхронно вызывать
execute_workflow
(т. е. запускать методrun
рабочего процесса) - Передайте ввод «World» в
GreetingWorkflow.run
- Дайте workflow идентификатор
hello-activity-workflow-id
- Поместите задачи, созданные рабочим процессом, в очередь
hello-activity-task-queue
(которую вы можете узнать из таких отдаленных мест, как пара строк выше, когда мы создавали Worker)
Запуск
Запустите это в своем терминале: python worker.py
Как только Worker выполняет код, он устанавливает строку, возвращенную из compose_greeting
, в result
, а затем распечатывает ее на терминале. Вы должны в конечном итоге увидеть Hello, World!
в вашем терминале.
В веб-интерфейсе вы увидите что-то вроде этого с одной строкой рабочего процесса для каждого выполнения вашего кода:
Щелкнув workflow ID для одной из строк, вы увидите все, что произошло с вашим кодом и Temporal server, когда ваш worker выполнил код workflow:
Реальный разговор: Temporal создан для приложений, которые делают больше, чем просто печатают текст на вашем терминале, но мы хотели дать чрезвычайно простой способ показать вам разные части. В следующем сообщении блога мы покажем вам фактическое приложение, которое использует Temporal для сохранения состояния в workflow.
Если вы хотите сделать что-то также простое, но это дает вам немного больше функциональности, вы можете получить ввод из консоли с помощью этой версии worker.py
:
import asyncio
from temporalio.client import Client
from temporalio.worker import Worker
from workflow import GreetingWorkflow, compose_greeting
interrupt_event = asyncio.Event()
async def main():
# Uncomment the line below to see logging
# logging.basicConfig(level=logging.INFO)
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
):
print("What's your name?")
name = input()
# While the worker is running, use the client to run the workflow and
# print out its result.
result = await client.execute_workflow(
GreetingWorkflow.run,
name,
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
В моем случае вывод выглядел так:
Этот код также приводит к немного другому событию WorkflowExecutionStarted
в истории событий рабочего процесса в веб-интерфейсе. Мы видим, что ввод «Matt» был передан в activity, а не «World».
Нажмите «Workflow Execution Started» в рабочем процессе, чтобы увидеть разницу!
Вывод
Вот и все, workflows и workers разбиты на несколько областей, над которыми вы можете работать в своем приложении. Надеемся, это поможет вам начать работу с вашим приложением Python и лучше понять, как вы можете работать с Temporal.
Это вторая статья из серии Python SDK. Вы можете найти первую статью или же продолжить знакомиться с данной темой далее пройдя по ссылкам: