为什么Python对工作流如此重要?

想象一下训练大型语言模型(LLM)的场景:你需要标记数据集、输入数据、等待模型运行、评估损失、调整模型,然后重复这个过程。如果没有自动化,每个步骤都需要手动启动、监控完成后再开始下一步。使用工作流,你可以轻松编排整个训练过程,每个步骤在前一步完成后自动触发。
对于数据管道——Python的主要应用场景之一——同样如此。通过自动化定义的幂等步骤序列,开发者可以部署一个处理整个数据管道的工作流。再比如构建AI代理(如管理杂货的智能助手),每周输入食谱清单后,代理需要:

  1. 整理所需食材清单
  2. 检查上周剩余食材
  3. 订购差缺部分并安排取货
    使用工作流可以简化代理架构,通过单独步骤重试和状态持久化提高完成率。Python工作流的支持使这类开发变得前所未有的简单。

Python工作流如何运作?

Cloudflare工作流基于我们为持久执行创建的基础设施,同时为Python用户提供符合语言习惯的编写方式。Python和JavaScript SDK实现了完全功能对等,这得益于Cloudflare Workers在运行时中直接支持Python。

创建Python工作流

工作流完全构建在Workers和Durable Objects之上。每个元素都负责存储工作流元数据和实例级信息。工作流控制平面的底层是用户Worker,即WorkflowEntrypoint。当工作流实例准备运行时,引擎通过RPC调用用户Worker的run方法(这里是Python Worker)。
工作流声明的示例框架如下:

from workers import WorkflowEntrypoint
class MyWorkflow(WorkflowEntrypoint):
    async def run(self, event, step):
        # 步骤定义在这里

run方法提供WorkflowStep参数,实现持久执行API,这是用户实现最多一次执行的基础。这些API在JavaScript中实现,需要在Python Worker上下文中访问。

克服语言障碍

Python Workers依赖Pyodide——CPython到WebAssembly的移植版。Pyodide提供到JavaScript的外部函数接口(FFI),允许从Python调用JavaScript方法。这正是其他绑定和Python包在Workers平台上工作的机制。
我们使用这个FFI层不仅允许直接使用工作流绑定,还在Python中提供WorkflowStep方法。通过将WorkflowEntrypoint视为运行时的特殊类,run方法被手动包装,使WorkflowStep作为JsProxy暴露,而不是像其他JavaScript对象那样进行类型转换。

打造符合Python习惯的SDK

将工作流移植到Python的重要部分是提供Python用户熟悉的接口。让我们看看TypeScript工作流定义的片段:

import {
  WorkflowEntrypoint,
  WorkflowStep,
  WorkflowEvent,
} from "cloudflare:workers";
export class MyWorkflow extends WorkflowEntrypoint {
  async run(event: WorkflowEvent<YourEventType>, step: WorkflowStep) {
    let state = step.do("my first step", async () => {
      let userEmail = event.payload.userEmail;
      let createdTimestamp = event.payload.createdTimestamp;
      return { userEmail: userEmail, createdTimestamp: createdTimestamp };
    });
    step.sleep("my first sleep", "30 minutes");
    await step.waitForEvent<EventType>("receive example event", {
      type: "simple-event",
      timeout: "1 hour",
    });
    const developerWeek = Date.parse("22 Sept 2025 13:00:00 UTC");
    await step.sleepUntil("sleep until X times out", developerWeek);
  }
}

Python实现需要修改do方法。与其他语言不同,Python不轻易支持匿名回调。这种行为通常通过装饰器实现,装饰器允许我们拦截方法并以符合语言习惯的方式暴露它。所有参数保持原始顺序,装饰方法作为回调。
waitForEventsleepsleepUntil方法可以保留原始签名,只要名称转换为蛇形命名法(snake_case)。对应的Python版本如下:

from workers import WorkflowEntrypoint
from datetime import datetime, timezone
class MyWorkflow(WorkflowEntrypoint):
    async def run(self, event, step):
        @step.do("my first step")
        async def my_first_step():
            user_email = event["payload"]["userEmail"]
            created_timestamp = event["payload"]["createdTimestamp"]
            return {
                "userEmail": user_email,
                "createdTimestamp": created_timestamp,
            }
        await my_first_step()
        step.sleep("my first sleep", "30 minutes")
        await step.wait_for_event(
            "receive example event",
            "simple-event",
            timeout="1 hour",
        )
        developer_week = datetime(2024, 10, 24, 13, 0, 0, tzinfo=timezone.utc)
        await step.sleep_until("sleep until X times out", developer_week)

DAG工作流:更优雅的并发处理

设计工作流时,我们经常管理步骤间的依赖关系,即使某些任务可以并发处理。许多工作流具有有向无环图(DAG)执行流程。Python工作流初始版本就支持并发,因为Pyodide捕获JavaScript thenables并将其代理为Python awaitables。
因此,asyncio.gather作为Promise.all的对应物工作良好。虽然这完全可用,我们也支持声明式方法。装饰do方法的优势在于我们可以在原始API上提供进一步抽象。以下展示使用DAG功能的Python API示例:

from workers import WorkflowEntrypoint
class PythonWorkflowDAG(WorkflowEntrypoint):
    async def run(self, event, step):
        @step.do('dependency 1')
        async def dep_1():
            # 执行操作
            print('executing dep1')
        @step.do('dependency 2')
        async def dep_2():
            # 执行操作
            print('executing dep2')
        @step.do('demo do', depends=[dep_1, dep_2], concurrent=True)
        async def final_step(res1=None, res2=None):
            # 执行操作
            print('something')
        await final_step()

这种方法使工作流声明更清晰,状态管理交给工作流引擎数据平面和Python Workers工作流包装器。即使多个步骤可以使用相同名称,引擎也会稍微修改每个步骤的名称以确保唯一性。在Python工作流中,依赖在涉及它的初始步骤成功完成后被视为已解决。

快速上手指南

创建第一个Python工作流

  1. 设置环境
    确保已安装最新版Wrangler CLI工具
  2. 初始化项目

    wrangler init my-workflow
    cd my-workflow
    
  3. 编写工作流代码
    创建src/index.py文件:

    from workers import WorkflowEntrypoint
    class MyWorkflow(WorkflowEntrypoint):
        async def run(self, event, step):
            @step.do("process data")
            async def process():
                return {"status": "completed"}
            
            result = await process()
            return result
    
  4. 配置wrangler.toml

    name = "my-workflow"
    main = "src/index.py"
    compatibility_date = "2024-01-01"
    [[workflows]]
    name = "my-workflow"
    binding = "WORKFLOW"
    class_name = "MyWorkflow"
    
  5. 部署工作流

    wrangler deploy
    

常见工作流模式

模式类型 适用场景 关键方法
顺序执行 简单线性流程 step.do() + await
事件等待 需要外部输入 step.wait_for_event()
延迟执行 定时任务 step.sleep() / step.sleep_until()
并行处理 独立任务组 asyncio.gather()depends参数

常见问题解答

Python工作流与TypeScript版本功能完全相同吗?
是的,我们实现了完全功能对等。所有JavaScript SDK的功能在Python中都有对应实现,包括错误处理、重试机制和状态持久化。
如何处理长时间运行的任务?
工作流引擎自动处理任务持久化。即使Worker重启,工作流状态也会保留,从中断点继续执行。只需确保步骤是幂等的(重复执行结果相同)。
可以在工作流中使用外部Python库吗?
支持所有Pyodide兼容的包,如matplotlib、pandas等。只需在代码中直接导入,无需额外配置。
工作流如何处理错误?
每个步骤内置错误处理和自动重试。你也可以在步骤内自定义错误处理逻辑,失败时工作流会根据配置策略重试。
DAG工作流中的依赖如何工作?
通过depends参数声明依赖关系。引擎会确保所有依赖步骤完成后才执行当前步骤。设置concurrent=True允许无依赖步骤并行运行。
如何监控工作流执行?
通过Cloudflare仪表板查看工作流实例状态、执行历史和错误日志。每个步骤的执行时间和结果都会被记录。
工作流有执行时间限制吗?
单个步骤最长可运行30分钟,整个工作流可运行数天甚至更久,因为状态会持久化存储。
如何调试Python工作流?
使用print()语句输出调试信息,这些会出现在工作流日志中。也可以在开发环境使用wrangler dev进行本地测试。