Temporal Python 1.0.0 – Надежный распределенный цикл событий Asyncio
Мы в восторге от общедоступного выпуска Temporal Python SDK. Python теперь является полностью поддерживаемым языком workflow в Temporal, а использование нами нативных конструкций asyncio делает его идеальным выбором для разработчиков Python, которые хотят создавать надежные рабочие процессы.
Вот несколько ссылок, чтобы узнать больше о Temporal Python:
Также дополнительно предоставляется:
Как и весь Temporal, Temporal Python имеет лицензию Массачусетского технологического института, и мы очень приветствуем участие в проектах с открытым исходным кодом.
Большая часть подробностей о том, как использовать SDK, находится в вышеупомянутых ссылках, поэтому в этом посте будет только краткое освежение информации о том, что такое Temporal и Temporal Python. Затем мы углубимся в некоторые детали того, как Temporal Python использует asyncio и другие функции Temporal Python.
Введение в Temporal
Temporal — это система рабочие процессы, позволяющая разработчикам записывать workflows в коде. Workflows выполняются на любом количестве workers. Код в рабочие процессы транслируется в события во Temporal, что означает, что код workflows может быть воспроизведен с событиями на разных рабочих процессах по мере необходимости в подходе Event Sourcing. Поэтому код workflows должен быть детерминированным для безопасного повторного запуска.
Workflows могут вызывать действия, которые также выполняются на workers и являются функциями общего назначения, которые могут делать что угодно.
Client могут сигнализировать, запрашивать, отменять и/или завершать workflows.
Поскольку workflows обычно ожидают таймеров, асинхронных задач, управляемых сопрограмм, отмен и т. д., workflows моделируются так же, как циклы событий Python asyncio.
Введение в Temporal Python
Чтобы дать краткое представление о Temporal Python, мы реализуем упрощенную форму покупки в один клик, при которой покупка начинается, а затем, если не отменена, будет выполнена через 10 секунд.
Реализация Activity
Во-первых, давайте создадим простую активность, которая выполняет HTTP POST покупки через aiohttp:
import aiohttp
from temporalio import activity
from temporalio.exceptions import ApplicationError
@dataclass
class Purchase:
item_id: str
user_id: str
@activity.defn
async def do_purchase(purchase: Purchase) -> None:
async with aiohttp.ClientSession() as sess:
async with sess.post("https://api.example.com/purchase", json=asdict(purchase)) as resp:
# We don't want to retry client failure
if resp.status >= 400 and resp.status < 500:
raise ApplicationError(f"Status: {resp.status}", resp.json(), non_retryable=True)
# Otherwise, fail on bad status which will be inherently retried
resp.raise_for_status()
Реализация Workflow
Теперь мы хотим выполнить это действие из workflow через 10 секунд, если только мы не получим отмену:
import asyncio
from datetime import timedelta
from enum import IntEnum
from temporalio import workflow
# Import our activity, but pass it through the sandbox
with workflow.unsafe.imports_passed_through():
from .my_activities import Purchase, do_purchase
class PurchaseStatus(IntEnum):
PENDING = 1
CONFIRMED = 2
CANCELLED = 3
COMPLETED = 4
@workflow.defn
class OneClickBuyWorkflow:
def __init__(self) -> None:
self.status = PurchaseStatus.PENDING
self.purchase: Optional[Purchase] = None
@workflow.run
async def run(self, purchase: Purchase) -> PurchaseStatus:
self.purchase = self.purchase or purchase
# Give user 10 seconds to cancel or update before we send it through
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
self.status = PurchaseStatus.CANCELLED
return self.status
# Update status, purchase, and update status again
self.status = PurchaseStatus.CONFIRMED
await workflow.execute_activity(
Purchaser.purchase,
self.purchase,
start_to_close_timeout=timedelta(minutes=1),
cancellation_type=workflow.ActivityCancellationType.WAIT_CANCELLATION_COMPLETED,
)
self.status = PurchaseStatus.COMPLETED
return self.status
@workflow.signal
def update_purchase(self, purchase: Purchase) -> None:
self.purchase = purchase
@workflow.query
def current_status(self) -> PurchaseStatus:
return self.status
Видите там asyncio.sleep
? Это не нормальный сон локального процесса; это надежный таймер, поддерживаемый Temporal. См. раздел «Temporal Workflows — это циклы событий Asyncio» ниже.
Запуск Worker
Workflows и activities выполняются в workers следующим образом:
from temporalio.client import Client
from temporalio.worker import Worker
from .my_workflows import OneClickBuyWorkflow
from .my_activities import do_purchase
# Create and run a worker on a task queue for the workflow and activity
worker = Worker(
await Client.connect("my.temporal.host:7233"),
task_queue="my-task-queue",
workflows=[OneClickBuyWorkflow],
activities=[do_purchase],
)
await worker.run()
Использование Client
Теперь мы можем запустить workflow, отправить ему сигнал, проверить его статус и т.д.:
from temporalio.client import Client
from .my_activities import Purchase
from .my_workflows import OneClickBuyWorkflow, PurchaseStatus
# Create the client
client = await Client.connect("my.temporal.host:7233")
# Start a workflow
handle = await client.start_workflow(
OneClickBuyWorkflow.run,
Purchase(item_id="item1", user_id="user1"),
id="user1-purchase1",
task_queue="my-task-queue",
)
# We can cancel it if we want
await handle.cancel()
# We can query its status, even if the workflow is complete
status = await handle.query(OneClickBuyWorkflow.current_status)
assert status == PurchaseStatus.CANCELLED
# We can do many other things with the client like sending a signal to update
# the purchase, wait for workflow completion, terminate the workflow, etc.
Это лишь малая часть того, что можно сделать с помощью Temporal Python. Дополнительные сведения см. в проекте Python SDK.
Теперь, когда мы дали краткий обзор Temporal Python, давайте обсудим, как он использует asyncio.
Temporal Workflows — это циклы событий Asyncio
Здесь мы опишем asyncio и то, как Temporal его использует.
Asyncio за кулисами
Что происходит при выполнении следующего в функции Python async def
?
await asyncio.sleep(10)
Спит 10 секунд. Но asyncio.sleep не похож на time.sleep — он не блокирует поток. Вместо этого он больше похож на JS setTimeout (или Go time.Sleep
, или Rust Tokio sleep
и т. д.) в том смысле, что он дает указание базовому планировщику или циклу событий перейти к другой работе и возобновить работу только по истечении этого времени.
В Python на момент написания этой статьи, вот как выглядит sleep(delay, result=None)
:
loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
return await future
finally:
h.cancel()
Таким образом, он получает текущий цикл обработки событий, создает для него будущее, а затем через call_later сообщает циклу обработки событий вызывать установщик результатов будущего через определенное время. Как будет реализован call_later
, зависит от цикла обработки событий. Цикл событий по умолчанию просто помещает таймер в кучу запланированных вещей, которые будут проверяться по «time» цикла событий на каждой отдельной итерации цикла событий.
Циклы событий Asyncio — это просто классы, реализующие asyncio.AbstractEventLoop. В большинстве случаев разработчики будут использовать asyncio.run
(my_async_func())
или аналогичный, который создаст цикл событий по умолчанию для этой системы и run_until_complete. run_until_complete
по умолчанию реализован как run_forever с вызовом stop, когда задача выполнена.
Так как же реализован run_forever
? Точно так же, как и следовало ожидать от цикла событий — с циклом:
while True:
self._run_once()
if self._stopping:
break
Это просто повторно вызывает _run_once
, который просто обрабатывает все, что готово к обработке (например, фьючерсы, которые стали готовы, таймеры, которые прошли свое время и т. д.), ожидающие пробуждения базовой системой для повторного запуска итерации.
Таким образом, цикл событий — это просто запущенный «loop», который выполняет все готовые задачи до тех пор, пока все они не будут выполнены, а затем ожидает повторного запуска.
Цикл событий Asyncio от Temporal
Во всех языках Temporal SDK рабочие процессы работают одинаково. При срабатывании они постоянно запускают все сопрограммы по отдельности и совместно, пока все они не будут переведены в режим ожидания обновлений с сервера Temporal. Затем мы отправляем на сервер собранные команды последней итерации. Как только сервер отправляет нам события (например, завершенный таймер, завершенное действие, полученный сигнал и т. д.), мы заполняем полученные значения и запускаем workflow, чтобы снова выполнить ту же обработку.
Таким образом, Temporal workflow — это просто запущенный «цикл», который выполняет все готовые задачи до тех пор, пока все не будут выполнены, а затем ожидает повторного запуска. Так же, как цикл событий. Поэтому вполне естественно разрабатывать временные сопрограммы workflow как сопрограммы asyncio.
Мы создали собственный экземпляр asyncio.AbstractEventLoop
, который установлен в качестве текущего цикла для временных рабочих процессов. Нам даже не нужно реализовывать run_until_complete
или run_forever
, потому что мы всегда хотим запустить только одну итерацию цикла событий.
Долговечность
Когда мы спим или запускаем задачу в обычном asyncio коде Python и процесс падает, наше состояние и способность продолжить с того места, где мы остановились, теряются. Тем не менее, временные рабочие процессы построены так, чтобы быть надежными и возобновляемыми. Поскольку код рабочего процесса является детерминированным, а все эти конструкции долговечны, сбой рабочего процесса или процесса не является проблемой. Temporal создан для воспроизведения существующих событий/кода, чтобы добраться до того места, где он остановился. Другой работник может легко возобновить рабочий процесс без потери данных или функциональности. Как только рабочий процесс принимается Temporal, он гарантированно выполняется до конца.
Таймеры
Таймеры Asyncio построены как временные таймеры. Например, внутри рабочего процесса, если мы запустим:
await asyncio.sleep(10)
На самом деле мы запускаем временной устойчивый таймер. Таймер будет виден в пользовательском интерфейсе. То же самое для всего, что заканчивается вызовом call_later. Итак, если у нас есть:
await asyncio.wait_for(workflow.execute_child_workflow(MyChildWorkflow.run), timeout=10)
Мы запустили дочерний рабочий процесс и 10-секундный таймер, который ведет себя так же, как Python. Это означает, что если 10-секундный тайм-аут произойдет до завершения дочернего рабочего процесса, это отменит дочерний рабочий процесс и вызовет ошибку TimeoutError
.
Мы даже можем получить текущее время цикла с помощью:
asyncio.get_running_loop().time()
Это фактически то же самое, что и workflow.now()
, но показывает, что цикл событий находится в детерминированном моменте времени. Мы даже можем использовать среду тестового рабочего процесса с пропуском времени и контролировать время вручную.
Задания
Создание задачи в цикле событий — правильный способ запустить временную сопрограмму в рабочем процессе. Таким образом, мы можем легко получить что-то вроде:
async with asyncio.TaskGroup() as tg:
for activity_param in range(20):
tg.create_task(
workflow.execute_activity(
my_activity,
activity_param,
start_to_close_timeout=timedelta(minutes=5),
),
)
При этом используется новая функциональность TaskGroup в Python 3.11 для запуска 20 действий и ожидания их завершения.
На самом деле Temporal Python был разработан до того, как стали доступны группы задач, но, поскольку мы используем собственные конструкции asyncio, новые функции работают автоматически.
Действия и дескрипторы дочерних рабочих процессов фактически реализованы как расширения асинхронных задач. Таким образом, запуск действия или запущенный дочерний рабочий процесс можно использовать в качестве задач. Это приемлемо:
handle = workflow.start_activity(my_activity, start_to_close_timeout=timedelta(minutes=5))
handle.set_name("my-task-name-for-stack-traces")
workflow.logger.info("Result: %s", await handle)
Отмена
К счастью для нас, отмена Python и временная отмена работают почти одинаково. Отмена asyncio и временная отмена — это просто запросы, вызывающие ошибки в базовом коде, но их можно перехватить и проигнорировать. Это означает, что если мы отменяем workflow от client, Temporal передаст это рабочему процессу как отмену задачи, что приведет к возникновению ошибки asyncio.CancelledError
.
Мы даже можем использовать экранирование, чтобы гарантировать, что что-то вроде действительно важного действия не может быть отменено, например:
await asyncio.shield(workflow.execute_activity(
do_not_cancel_me,
start_to_close_timeout=timedelta(minutes=5),
))
Обратите внимание, что это не помешает внешней части все еще вызывать отмененную ошибку, которая, если ее не поймать, отменит рабочий процесс, тем самым в любом случае отменив действие. См. документацию по защите Python, чтобы узнать, как полностью игнорировать отмену.
Это означает, что отмена задачи также работает. Скажем, мы хотим иметь возможность отменить групповые действия через 3 секунды:
multiple_activities = asyncio.create_task(asyncio.gather(
workflow.execute_activity(my_activity1, start_to_close_timeout=timedelta(minutes=5)),
workflow.execute_activity(my_activity2, start_to_close_timeout=timedelta(minutes=5)),
))
await asyncio.sleep(3)
multiple_activities.cancel()
await multiple_activities
Это на самом деле выдает постоянные запросы на отмену действий, где бы они ни выполнялись, если они были запущены.
Примитивы синхронизации
Мы также можем использовать самые детерминированные примитивы asyncio синхронизации. Например, очень часто используют asyncio.Queue или asyncio.Event.. Часто рабочие процессы пишутся так:
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.should_proceed = asyncio.Event()
@workflow.run
async def run(self) -> None:
workflow.logger.info("Waiting...")
await self.should_proceed.wait()
workflow.logger.info("Completing!")
@workflow.signal
def proceed(self) -> None:
self.should_proceed.set()
Затем client может сигнализировать о продолжении рабочего процесса.
Условие ожидания
Поскольку мы контролируем каждую итерацию цикла событий, мы можем делать более сложные вещи, чем обычный код Python. Например, мы предлагаем wait_condition
, которое не возвращается до тех пор, пока обратный вызов не станет истинным. Мы вызываем этот обратный вызов на каждой итерации. Таким образом, тот же приведенный выше пример сигнала может быть записан как:
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.should_proceed = False
@workflow.run
async def run(self) -> None:
workflow.logger.info("Waiting...")
await workflow.wait_condition(lambda: self.should_proceed)
workflow.logger.info("Completing!")
@workflow.signal
def proceed(self) -> None:
self.should_proceed = True
Этот тип функции обычно недоступен в асинхронных циклах Python, потому что нет простого способа запускать обратный вызов на каждой итерации цикла обработки событий.
Ограничения
Реализованы только детерминированные асинхронные конструкции. Итак, все, что связано с подпроцессами, дисковым/сетевым вводом-выводом и т.д. завершится неудачей, если будет вызван.
В настоящее время в цикле событий Temporal реализованы только функции, основанные на относительном времени. Поэтому использование call_at не удастся. К сожалению, более новый высокоуровневый asyncio.timeout реализован через call_at
, хотя это занимает относительное время, поэтому его также нельзя использовать. Возможно, однажды мы уберем это ограничение и разрешим абсолютное время относительно времени рабочего процесса, но это может привести к путанице кода для пользователя.
Другие функции Temporal Python
Хотя этот пост посвящен тому, как рабочие процессы Temporal являются надежными асинхронными циклами событий, есть и другие интересные аспекты Temporal Python, на которые стоит обратить внимание.
Полностью типизированный
Вся библиотека полностью типизирована с использованием новейших возможностей подсказок типов Python. Библиотека была разработана с учетом типизации, и многие вызовы были разработаны как универсальные, чтобы помочь разработчикам находить ошибки. Например, для рабочего процесса:
@workflow.defn
class MyWorkflow
@workflow.run
async def run(self, param: int) -> None:
...
MyPy (или другие средства проверки типов) сообщит об ошибке, если мы это сделаем:
await my_client.execute_workflow(MyWorkflow.run, "some param", id="id", task_queue="tq")
Это не удается, потому что рабочий процесс не принимает строковый аргумент, он принимает целое число. Эта безопасность типов распространена во всей библиотеке. См. документацию API для всех типов и перегрузок.
Несколько стилей Activity
Мы показали только действия с async def
, которые являются наиболее распространенным и рекомендуемым способом разработки действий. Но для многих применений Python требуются неасинхронные вызовы, а вызов в асинхронном контексте блокирует цикл обработки событий.
Разработчики могут использовать run_in_executor
из своей асинхронной активности, если хотят (и это очень распространенный подход), но мы также поддерживаем неасинхронные активности. Для многопоточных действий worker может быть предоставлен ThreadPoolExecutor для запуска действий. Мы даже поддерживаем многопроцессорные действия с помощью ProcessPoolExecutor. Были предприняты дополнительные усилия для поддержки пульсации активности и отмены активности в потоках и даже процессах. См. документацию репозитория для получения дополнительной информации.
Workflow Sandbox
По умолчанию все workflows выполняются в sandbox. При каждом запуске рабочего процесса sandbox повторно импортирует файл, в котором находится workflow, чтобы не допустить загрязнения глобального состояния. Кроме того, он проксирует известные недетерминированные вызовы стандартной библиотеки, чтобы предотвратить случайные вещи, такие как доступ к диску или случайные действия внутри рабочего процесса. Большинство импортируемых нестандартных библиотек должны быть помечены как сквозные при импорте, например:
with workflow.unsafe.imports_passed_through():
import pydantic
Это предотвращает их повторный импорт, что экономит память и производительность. Sandbox не является надежной или безопасной. Есть некоторые известные предостережения, и sandbox может быть отключена для каждого workflow или для всего worker. См. документацию репозитория для более подробной информации.
Ядро Rust
И этот Python SDK, и TypeScript SDK (а также предстоящие Ruby и .NET SDK) поддерживаются одним и тем же ядром Rust. В отличие от многих «SDK», Temporal SDK — это не просто интеллектуальные клиенты. Скорее, это целые сложные машины состояний. В Temporal пользователи запускают worker, и весь код workflow и activity происходит на worker, поэтому для обеспечения их правильной работы необходимы сложные махинации.
В Python мы используем PyO3 и PyO3 Asyncio с некоторым пользовательским кодом моста Rust, чтобы это работало с ядром Rust. Это означает, что код не только работает быстро, но и автоматически включает исправления и улучшения конечного автомата по мере улучшения ядра.
Воспроизведение
Одной из самых мощных, но часто упускаемых из виду функций Temporal является возможность воспроизведения кода workflow с использованием исторических запусков. Мы можем получать истории рабочих процессов с сервера Temporal и запускать их в нашем локальном коде workflow с помощью реплеера.
Воспроизведение прошлых рабочих процессов в более новом коде workflow может помочь выявить несовместимые/недетерминированные изменения и другие неожиданные ошибки перед развертыванием.
Кроме того, воспроизведение прошлой истории рабочего процесса в локальном коде workflow позволяет нам отлаживать этот код. Просто отключите детектор взаимоблокировок в реплеере с помощью debug_mode=True
, и мы сможем поставить точки останова в коде нашего workflow и выполнить рабочий процесс в точности так, как он изначально выполнялся.
Тестовые среды
Temporal Python предлагает две реализации тестового сервера для упрощения тестирования.
Полный темпоральный сервер, работающий локально на SQLite, можно запустить с помощью awaittemporalio.testing.WorkflowEnvironment.start_local()
. Это загружает исполняемый файл во временное расположение, если он еще не существует, и запускает его довольно быстро. У него есть все функции стандартного Temporal сервера, поскольку он является полным однодвоичным временным сервером. Пользовательский интерфейс можно даже включить.
Мы также предоставляем реализацию сервера с пропуском времени. awaittemporalio.testing.WorkflowEnvironment.start_time_skipping()
запускает сервер, который может пропускать время до следующего события рабочего процесса. Используя это, время также можно пропустить вручную. Это идеальное решение для тестирования workflow, которые долго не работают, или в случаях, когда мы хотим проверить, как тайм-аут может повлиять на систему. Как и полный Temporal сервер, этот также лениво загружает двоичный файл при первом вызове, а затем выполняет. На самом деле он написан на Java как часть нашего Java SDK и изначально скомпилирован с помощью GraalVM.
Заключение
Итак, это базовое введение в Temporal Python, то, как он тесно интегрируется с asyncio, и некоторые другие его функции. Есть еще много функций, не описанных в этом посте. Загляните в репозиторий и попробуйте!
Это Заключительная статья из серии Python SDK. Вы можете найти предыдущие статьи или же продолжить знакомиться с данной темой пройдя по ссылкам: