用 ClearFlow 打造不踩坑的大模型工作流:从安装到上线的完整笔记

“为什么我的 AI 流水线总是莫名其妙地卡住?”
“调试时变量被谁偷偷改了?”
“上线前一天,团队还在为依赖冲突焦头烂额。”

如果你也遇到过以上灵魂三问,今天这篇长文或许能给你一条新思路。主角是一个只有 166 行代码、零依赖的 Python 库——ClearFlow。它把“状态不可变、路由显式、流程可测”这三件事做到了极致,让复杂的大模型工作流像搭积木一样直观。


目录

  1. 什么是 ClearFlow?
  2. 为什么它比手撸 if-else 更香?
  3. 5 分钟安装与第一个“Hello LLM”
  4. 核心概念:Node、NodeResult、Flow
  5. 真实场景:多步数据管道实战
  6. 测试、调试与踩坑经验
  7. 与 PocketFlow 的理性对比
  8. 常见疑问解答(FAQ)
  9. 小结与下一步阅读

1. 什么是 ClearFlow?

一句话版本:
ClearFlow 是一个“用纯 Python 写异步工作流”的微型框架,专为语言模型设计。它只给你三件事——节点、结果、流程——却足够覆盖 90% 的 LLM 流水线需求。

特点速览

  • 显式路由:每一步的下一个目的地必须写清楚,拒绝“黑魔法”。
  • 不可变状态:节点之间传递的是冻结后的状态副本,调试时永远不用担心“谁改了我的变量”。
  • 零运行时依赖:不捆绑任何 LLM SDK,你高兴用 OpenAI、Anthropic 还是本地模型都行。
  • 单文件源码clearflow.py 只有 166 行,读完源码再去上厕所都来得及。

2. 为什么它比手撸 if-else 更香?

手撸 if-else ClearFlow
状态在函数间传来传去,随时可能被某行代码改掉 状态一旦生成即为只读
流程图藏在代码深处,新人需要脑补 路由规则集中声明,一眼看懂
单元测试要写大量 mock 节点是纯函数,测起来像测数学函数
上线前发现缺依赖,紧急 pip install 零运行时依赖,部署包体积极小

3. 5 分钟安装与第一个“Hello LLM”

3.1 安装

# 如果这是你第一次用 uv
pip install --user uv      # 或者 pipx install uv
# 一行命令搞定
pip install clearflow

3.2 60 秒跑通最小示例

下面这段代码模拟了一个最简单的聊天机器人:用户说“Hi”,机器人回“Hello!”。别看简单,它已经展示了 ClearFlow 的所有要素。

import asyncio
from typing import TypedDict
from clearflow import Flow, Node, NodeResult

# 1. 先定义一个“状态”长什么样
class ChatState(TypedDict):
    messages: list[dict[str, str]]

# 2. 写一个节点:负责把用户消息送进 LLM,再把回答塞回状态
class ChatNode(Node[ChatState]):
    async def exec(self, state: ChatState) -> NodeResult[ChatState]:
        # 这里可以换成真正的 LLM 调用
        # reply = await openai_client.chat.completions.create(...)
        reply = {"role": "assistant", "content": "Hello!"}
        new_state: ChatState = {"messages": [*state["messages"], reply]}
        return NodeResult(new_state, outcome="success")

# 3. 用 Flow 把节点串起来
chat = ChatNode()
flow = (
    Flow[ChatState]("ChatBot")
    .start_with(chat)
    .route(chat, "success", None)  # 成功就结束
    .build()
)

# 4. 运行
async def main():
    result = await flow({"messages": [{"role": "user", "content": "Hi"}]})
    print(result.state["messages"][-1]["content"])  # Hello!

asyncio.run(main())

运行结果:

Hello!

4. 核心概念:Node、NodeResult、Flow

4.1 Node:最小的“工作单元”

每个节点都是继承自 Node[T] 的类,必须实现 exec 方法。你可以把预处理放在 prep,后处理放在 post,但它们都是可选的。

class MyNode(Node[int]):
    async def prep(self, state: int) -> int:
        # 校验或补充数据
        return state

    async def exec(self, state: int) -> NodeResult[int]:
        new_state = state + 1
        return NodeResult(new_state, outcome="ok")

    async def post(self, result: NodeResult[int]) -> NodeResult[int]:
        # 可以在这里写日志
        return result

4.2 NodeResult:一次性打包“新状态 + 路由标签”

NodeResult(new_state, outcome="continue")

outcome 是一个普通字符串,Flow 会根据它决定下一步去哪儿。

4.3 Flow:把节点串成“图”

flow = (
    Flow[int]("MyFlow")
    .start_with(validate)
    .route(validate, "ok", process)
    .route(validate, "bad", error_handler)
    .route(process, "done", None)   # 终止
    .build()
)

注意:ClearFlow 强制你只能有一个终止出口,避免流程“跑飞”。


5. 真实场景:多步数据管道实战

需求:

  1. 用户输入一个整数。
  2. 如果小于 0,直接打印错误并结束。
  3. 如果大于等于 0,乘以 2 再打印结果。

5.1 定义状态

class State(TypedDict):
    value: int

5.2 写三个节点

class Validate(Node[State]):
    async def exec(self, s: State) -> NodeResult[State]:
        if s["value"] >= 0:
            return NodeResult(s, "valid")
        return NodeResult(s, "invalid")

class Process(Node[State]):
    async def exec(self, s: State) -> NodeResult[State]:
        return NodeResult({"value": s["value"] * 2}, "success")

class Output(Node[State]):
    async def exec(self, s: State) -> NodeResult[State]:
        print("Final:", s["value"])
        return NodeResult(s, "done")

5.3 组装流程

flow = (
    Flow[State]("Pipeline")
    .start_with(Validate())
    .route("Validate", "valid", Process())
    .route("Validate", "invalid", Output())
    .route("Process", "success", Output())
    .route("Output", "done", None)
    .build()
)

await flow({"value": 21})   # 输出:Final: 42
await flow({"value": -5})   # 输出:Final: -5

6. 测试、调试与踩坑经验

6.1 节点级单元测试

因为节点是纯函数,测试就是输入输出比对:

import pytest
from clearflow import Node, NodeResult

class Inc(Node[int]):
    async def exec(self, x: int) -> NodeResult[int]:
        return NodeResult(x + 1, "ok")

@pytest.mark.asyncio
async def test_inc():
    res = await Inc()(0)
    assert res.state == 1 and res.outcome == "ok"

6.2 调试技巧

  • preppost 里加 print 或日志,观察状态变化。
  • breakpoint() 单步,因为节点就是普通 async 函数。
  • 如果流程卡死,检查是否忘记给某个 outcome 写 route。

7. 与 PocketFlow 的理性对比

维度 ClearFlow PocketFlow
状态管理 不可变、类型安全 共享可变字典
路由方式 显式 (节点, outcome) 映射 图结构 + 边标签
终止规则 强制单出口 允许多出口
类型提示 Python 3.13+ 泛型 动态
源码行数 166 行 100 行

一句话总结:

  • 想要类型安全、可预测、可维护——选 ClearFlow。
  • 想要极简脚本、快速验证——选 PocketFlow。

8. 常见疑问解答(FAQ)

Q1:ClearFlow 支持并发或并行吗?
A:节点本身是 async,单节点内部可以自由使用 asyncio.gather 实现并发。框架层面暂不提供“并行节点”语法糖,需要你自己在节点里调度。

Q2:能把一个 Flow 当作子流程嵌到另一个 Flow 吗?
A:可以。构建后的 Flow 本质上就是一个 Node,直接像普通节点一样 .route() 即可。

Q3:如何接入真正的 OpenAI 或 Claude?
A:在节点的 exec 里调用你喜欢的 SDK,把返回结果塞进 new_state 就行。ClearFlow 不限制 HTTP 客户端。

Q4:需要 Python 3.13 吗?
A:官方示例使用了 3.13 的泛型语法,实际 3.10+ 也能跑,只需把 list[dict[str, str]] 改成 List[Dict[str, str]]from typing import List, Dict

Q5:生产环境怎么打日志?
A:在节点的 post 方法里统一写日志即可;因为状态不可变,你可以放心地把整份状态 JSON 化后落盘。


9. 小结与下一步阅读

今天我们用不到 100 行代码,演示了如何用 ClearFlow 把一个看似简单的需求拆成可测试、可维护、零依赖的工作流。如果你正准备:

  • 把 LLM 接入现有业务系统
  • 让数据科学家和工程师用同一套语言描述流程
  • 在两周内上线 MVP 并保证后续可迭代

那么 ClearFlow 值得你放进工具箱。

下一步阅读:

祝你编码愉快,少踩坑,多睡觉。