用 ClearFlow 打造不踩坑的大模型工作流:从安装到上线的完整笔记
“为什么我的 AI 流水线总是莫名其妙地卡住?”
“调试时变量被谁偷偷改了?”
“上线前一天,团队还在为依赖冲突焦头烂额。”
如果你也遇到过以上灵魂三问,今天这篇长文或许能给你一条新思路。主角是一个只有 166 行代码、零依赖的 Python 库——ClearFlow。它把“状态不可变、路由显式、流程可测”这三件事做到了极致,让复杂的大模型工作流像搭积木一样直观。
目录
-
什么是 ClearFlow? -
为什么它比手撸 if-else 更香? -
5 分钟安装与第一个“Hello LLM” -
核心概念:Node、NodeResult、Flow -
真实场景:多步数据管道实战 -
测试、调试与踩坑经验 -
与 PocketFlow 的理性对比 -
常见疑问解答(FAQ) -
小结与下一步阅读
1. 什么是 ClearFlow?
一句话版本:
ClearFlow 是一个“用纯 Python 写异步工作流”的微型框架,专为语言模型设计。它只给你三件事——节点、结果、流程——却足够覆盖 90% 的 LLM 流水线需求。
特点速览
-
显式路由:每一步的下一个目的地必须写清楚,拒绝“黑魔法”。 -
不可变状态:节点之间传递的是冻结后的状态副本,调试时永远不用担心“谁改了我的变量”。 -
零运行时依赖:不捆绑任何 LLM SDK,你高兴用 OpenAI、Anthropic 还是本地模型都行。 -
单文件源码: clearflow.py
只有 166 行,读完源码再去上厕所都来得及。
2. 为什么它比手撸 if-else 更香?
手撸 if-else | ClearFlow |
---|---|
状态在函数间传来传去,随时可能被某行代码改掉 | 状态一旦生成即为只读 |
流程图藏在代码深处,新人需要脑补 | 路由规则集中声明,一眼看懂 |
单元测试要写大量 mock | 节点是纯函数,测起来像测数学函数 |
上线前发现缺依赖,紧急 pip install | 零运行时依赖,部署包体积极小 |
3. 5 分钟安装与第一个“Hello LLM”
3.1 安装
# 如果这是你第一次用 uv
pip install --user uv # 或者 pipx install uv
# 一行命令搞定
pip install clearflow
3.2 60 秒跑通最小示例
下面这段代码模拟了一个最简单的聊天机器人:用户说“Hi”,机器人回“Hello!”。别看简单,它已经展示了 ClearFlow 的所有要素。
import asyncio
from typing import TypedDict
from clearflow import Flow, Node, NodeResult
# 1. 先定义一个“状态”长什么样
class ChatState(TypedDict):
messages: list[dict[str, str]]
# 2. 写一个节点:负责把用户消息送进 LLM,再把回答塞回状态
class ChatNode(Node[ChatState]):
async def exec(self, state: ChatState) -> NodeResult[ChatState]:
# 这里可以换成真正的 LLM 调用
# reply = await openai_client.chat.completions.create(...)
reply = {"role": "assistant", "content": "Hello!"}
new_state: ChatState = {"messages": [*state["messages"], reply]}
return NodeResult(new_state, outcome="success")
# 3. 用 Flow 把节点串起来
chat = ChatNode()
flow = (
Flow[ChatState]("ChatBot")
.start_with(chat)
.route(chat, "success", None) # 成功就结束
.build()
)
# 4. 运行
async def main():
result = await flow({"messages": [{"role": "user", "content": "Hi"}]})
print(result.state["messages"][-1]["content"]) # Hello!
asyncio.run(main())
运行结果:
Hello!
4. 核心概念:Node、NodeResult、Flow
4.1 Node:最小的“工作单元”
每个节点都是继承自 Node[T]
的类,必须实现 exec
方法。你可以把预处理放在 prep
,后处理放在 post
,但它们都是可选的。
class MyNode(Node[int]):
async def prep(self, state: int) -> int:
# 校验或补充数据
return state
async def exec(self, state: int) -> NodeResult[int]:
new_state = state + 1
return NodeResult(new_state, outcome="ok")
async def post(self, result: NodeResult[int]) -> NodeResult[int]:
# 可以在这里写日志
return result
4.2 NodeResult:一次性打包“新状态 + 路由标签”
NodeResult(new_state, outcome="continue")
outcome
是一个普通字符串,Flow 会根据它决定下一步去哪儿。
4.3 Flow:把节点串成“图”
flow = (
Flow[int]("MyFlow")
.start_with(validate)
.route(validate, "ok", process)
.route(validate, "bad", error_handler)
.route(process, "done", None) # 终止
.build()
)
注意:ClearFlow 强制你只能有一个终止出口,避免流程“跑飞”。
5. 真实场景:多步数据管道实战
需求:
-
用户输入一个整数。 -
如果小于 0,直接打印错误并结束。 -
如果大于等于 0,乘以 2 再打印结果。
5.1 定义状态
class State(TypedDict):
value: int
5.2 写三个节点
class Validate(Node[State]):
async def exec(self, s: State) -> NodeResult[State]:
if s["value"] >= 0:
return NodeResult(s, "valid")
return NodeResult(s, "invalid")
class Process(Node[State]):
async def exec(self, s: State) -> NodeResult[State]:
return NodeResult({"value": s["value"] * 2}, "success")
class Output(Node[State]):
async def exec(self, s: State) -> NodeResult[State]:
print("Final:", s["value"])
return NodeResult(s, "done")
5.3 组装流程
flow = (
Flow[State]("Pipeline")
.start_with(Validate())
.route("Validate", "valid", Process())
.route("Validate", "invalid", Output())
.route("Process", "success", Output())
.route("Output", "done", None)
.build()
)
await flow({"value": 21}) # 输出:Final: 42
await flow({"value": -5}) # 输出:Final: -5
6. 测试、调试与踩坑经验
6.1 节点级单元测试
因为节点是纯函数,测试就是输入输出比对:
import pytest
from clearflow import Node, NodeResult
class Inc(Node[int]):
async def exec(self, x: int) -> NodeResult[int]:
return NodeResult(x + 1, "ok")
@pytest.mark.asyncio
async def test_inc():
res = await Inc()(0)
assert res.state == 1 and res.outcome == "ok"
6.2 调试技巧
-
在 prep
或post
里加print
或日志,观察状态变化。 -
用 breakpoint()
单步,因为节点就是普通 async 函数。 -
如果流程卡死,检查是否忘记给某个 outcome 写 route。
7. 与 PocketFlow 的理性对比
维度 | ClearFlow | PocketFlow |
---|---|---|
状态管理 | 不可变、类型安全 | 共享可变字典 |
路由方式 | 显式 (节点, outcome) 映射 |
图结构 + 边标签 |
终止规则 | 强制单出口 | 允许多出口 |
类型提示 | Python 3.13+ 泛型 | 动态 |
源码行数 | 166 行 | 100 行 |
一句话总结:
-
想要类型安全、可预测、可维护——选 ClearFlow。 -
想要极简脚本、快速验证——选 PocketFlow。
8. 常见疑问解答(FAQ)
Q1:ClearFlow 支持并发或并行吗?
A:节点本身是 async,单节点内部可以自由使用 asyncio.gather
实现并发。框架层面暂不提供“并行节点”语法糖,需要你自己在节点里调度。
Q2:能把一个 Flow 当作子流程嵌到另一个 Flow 吗?
A:可以。构建后的 Flow 本质上就是一个 Node,直接像普通节点一样 .route()
即可。
Q3:如何接入真正的 OpenAI 或 Claude?
A:在节点的 exec
里调用你喜欢的 SDK,把返回结果塞进 new_state
就行。ClearFlow 不限制 HTTP 客户端。
Q4:需要 Python 3.13 吗?
A:官方示例使用了 3.13 的泛型语法,实际 3.10+ 也能跑,只需把 list[dict[str, str]]
改成 List[Dict[str, str]]
并 from typing import List, Dict
。
Q5:生产环境怎么打日志?
A:在节点的 post
方法里统一写日志即可;因为状态不可变,你可以放心地把整份状态 JSON 化后落盘。
9. 小结与下一步阅读
今天我们用不到 100 行代码,演示了如何用 ClearFlow 把一个看似简单的需求拆成可测试、可维护、零依赖的工作流。如果你正准备:
-
把 LLM 接入现有业务系统 -
让数据科学家和工程师用同一套语言描述流程 -
在两周内上线 MVP 并保证后续可迭代
那么 ClearFlow 值得你放进工具箱。
下一步阅读:
-
GitHub 源码:github.com/consent-ai/ClearFlow -
官方示例:examples/chat、examples/structured_output 文件夹
祝你编码愉快,少踩坑,多睡觉。