
为什么Python对工作流如此重要?
想象一下训练大型语言模型(LLM)的场景:你需要标记数据集、输入数据、等待模型运行、评估损失、调整模型,然后重复这个过程。如果没有自动化,每个步骤都需要手动启动、监控完成后再开始下一步。使用工作流,你可以轻松编排整个训练过程,每个步骤在前一步完成后自动触发。
对于数据管道——Python的主要应用场景之一——同样如此。通过自动化定义的幂等步骤序列,开发者可以部署一个处理整个数据管道的工作流。再比如构建AI代理(如管理杂货的智能助手),每周输入食谱清单后,代理需要:
-
整理所需食材清单 -
检查上周剩余食材 -
订购差缺部分并安排取货
使用工作流可以简化代理架构,通过单独步骤重试和状态持久化提高完成率。Python工作流的支持使这类开发变得前所未有的简单。
Python工作流如何运作?
Cloudflare工作流基于我们为持久执行创建的基础设施,同时为Python用户提供符合语言习惯的编写方式。Python和JavaScript SDK实现了完全功能对等,这得益于Cloudflare Workers在运行时中直接支持Python。
创建Python工作流
工作流完全构建在Workers和Durable Objects之上。每个元素都负责存储工作流元数据和实例级信息。工作流控制平面的底层是用户Worker,即WorkflowEntrypoint。当工作流实例准备运行时,引擎通过RPC调用用户Worker的run方法(这里是Python Worker)。
工作流声明的示例框架如下:
from workers import WorkflowEntrypoint
class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
# 步骤定义在这里
run方法提供WorkflowStep参数,实现持久执行API,这是用户实现最多一次执行的基础。这些API在JavaScript中实现,需要在Python Worker上下文中访问。
克服语言障碍
Python Workers依赖Pyodide——CPython到WebAssembly的移植版。Pyodide提供到JavaScript的外部函数接口(FFI),允许从Python调用JavaScript方法。这正是其他绑定和Python包在Workers平台上工作的机制。
我们使用这个FFI层不仅允许直接使用工作流绑定,还在Python中提供WorkflowStep方法。通过将WorkflowEntrypoint视为运行时的特殊类,run方法被手动包装,使WorkflowStep作为JsProxy暴露,而不是像其他JavaScript对象那样进行类型转换。
打造符合Python习惯的SDK
将工作流移植到Python的重要部分是提供Python用户熟悉的接口。让我们看看TypeScript工作流定义的片段:
import {
WorkflowEntrypoint,
WorkflowStep,
WorkflowEvent,
} from "cloudflare:workers";
export class MyWorkflow extends WorkflowEntrypoint {
async run(event: WorkflowEvent<YourEventType>, step: WorkflowStep) {
let state = step.do("my first step", async () => {
let userEmail = event.payload.userEmail;
let createdTimestamp = event.payload.createdTimestamp;
return { userEmail: userEmail, createdTimestamp: createdTimestamp };
});
step.sleep("my first sleep", "30 minutes");
await step.waitForEvent<EventType>("receive example event", {
type: "simple-event",
timeout: "1 hour",
});
const developerWeek = Date.parse("22 Sept 2025 13:00:00 UTC");
await step.sleepUntil("sleep until X times out", developerWeek);
}
}
Python实现需要修改do方法。与其他语言不同,Python不轻易支持匿名回调。这种行为通常通过装饰器实现,装饰器允许我们拦截方法并以符合语言习惯的方式暴露它。所有参数保持原始顺序,装饰方法作为回调。
waitForEvent、sleep和sleepUntil方法可以保留原始签名,只要名称转换为蛇形命名法(snake_case)。对应的Python版本如下:
from workers import WorkflowEntrypoint
from datetime import datetime, timezone
class MyWorkflow(WorkflowEntrypoint):
async def run(self, event, step):
@step.do("my first step")
async def my_first_step():
user_email = event["payload"]["userEmail"]
created_timestamp = event["payload"]["createdTimestamp"]
return {
"userEmail": user_email,
"createdTimestamp": created_timestamp,
}
await my_first_step()
step.sleep("my first sleep", "30 minutes")
await step.wait_for_event(
"receive example event",
"simple-event",
timeout="1 hour",
)
developer_week = datetime(2024, 10, 24, 13, 0, 0, tzinfo=timezone.utc)
await step.sleep_until("sleep until X times out", developer_week)
DAG工作流:更优雅的并发处理
设计工作流时,我们经常管理步骤间的依赖关系,即使某些任务可以并发处理。许多工作流具有有向无环图(DAG)执行流程。Python工作流初始版本就支持并发,因为Pyodide捕获JavaScript thenables并将其代理为Python awaitables。
因此,asyncio.gather作为Promise.all的对应物工作良好。虽然这完全可用,我们也支持声明式方法。装饰do方法的优势在于我们可以在原始API上提供进一步抽象。以下展示使用DAG功能的Python API示例:
from workers import WorkflowEntrypoint
class PythonWorkflowDAG(WorkflowEntrypoint):
async def run(self, event, step):
@step.do('dependency 1')
async def dep_1():
# 执行操作
print('executing dep1')
@step.do('dependency 2')
async def dep_2():
# 执行操作
print('executing dep2')
@step.do('demo do', depends=[dep_1, dep_2], concurrent=True)
async def final_step(res1=None, res2=None):
# 执行操作
print('something')
await final_step()
这种方法使工作流声明更清晰,状态管理交给工作流引擎数据平面和Python Workers工作流包装器。即使多个步骤可以使用相同名称,引擎也会稍微修改每个步骤的名称以确保唯一性。在Python工作流中,依赖在涉及它的初始步骤成功完成后被视为已解决。
快速上手指南
创建第一个Python工作流
-
设置环境
确保已安装最新版Wrangler CLI工具 -
初始化项目 wrangler init my-workflow cd my-workflow -
编写工作流代码
创建src/index.py文件:from workers import WorkflowEntrypoint class MyWorkflow(WorkflowEntrypoint): async def run(self, event, step): @step.do("process data") async def process(): return {"status": "completed"} result = await process() return result -
配置wrangler.toml name = "my-workflow" main = "src/index.py" compatibility_date = "2024-01-01" [[workflows]] name = "my-workflow" binding = "WORKFLOW" class_name = "MyWorkflow" -
部署工作流 wrangler deploy
常见工作流模式
| 模式类型 | 适用场景 | 关键方法 |
|---|---|---|
| 顺序执行 | 简单线性流程 | step.do() + await |
| 事件等待 | 需要外部输入 | step.wait_for_event() |
| 延迟执行 | 定时任务 | step.sleep() / step.sleep_until() |
| 并行处理 | 独立任务组 | asyncio.gather() 或 depends参数 |
常见问题解答
Python工作流与TypeScript版本功能完全相同吗?
是的,我们实现了完全功能对等。所有JavaScript SDK的功能在Python中都有对应实现,包括错误处理、重试机制和状态持久化。
如何处理长时间运行的任务?
工作流引擎自动处理任务持久化。即使Worker重启,工作流状态也会保留,从中断点继续执行。只需确保步骤是幂等的(重复执行结果相同)。
可以在工作流中使用外部Python库吗?
支持所有Pyodide兼容的包,如matplotlib、pandas等。只需在代码中直接导入,无需额外配置。
工作流如何处理错误?
每个步骤内置错误处理和自动重试。你也可以在步骤内自定义错误处理逻辑,失败时工作流会根据配置策略重试。
DAG工作流中的依赖如何工作?
通过depends参数声明依赖关系。引擎会确保所有依赖步骤完成后才执行当前步骤。设置concurrent=True允许无依赖步骤并行运行。
如何监控工作流执行?
通过Cloudflare仪表板查看工作流实例状态、执行历史和错误日志。每个步骤的执行时间和结果都会被记录。
工作流有执行时间限制吗?
单个步骤最长可运行30分钟,整个工作流可运行数天甚至更久,因为状态会持久化存储。
如何调试Python工作流?
使用print()语句输出调试信息,这些会出现在工作流日志中。也可以在开发环境使用wrangler dev进行本地测试。

