【PocketFlow】先上核心代码importasyncio,warnings,copy,timeclassBaseNode:def__init__(self):self.params,self.successors{},{}defset_params(self,params):self.paramsparamsdefnext(self,node,actiondefault):ifactioninself.successors:warnings.warn(fOverwriting successor for action {action})self.successors[action]node;returnnodedefprep(self,shared):passdefexec(self,prep_res):passdefpost(self,shared,prep_res,exec_res):passdef_exec(self,prep_res):returnself.exec(prep_res)def_run(self,shared):pself.prep(shared);eself._exec(p);returnself.post(shared,p,e)defrun(self,shared):ifself.successors:warnings.warn(Node wont run successors. Use Flow.)returnself._run(shared)def__rshift__(self,other):returnself.next(other)def__sub__(self,action):ifisinstance(action,str):return_ConditionalTransition(self,action)raiseTypeError(Action must be a string)class_ConditionalTransition:def__init__(self,src,action):self.src,self.actionsrc,actiondef__rshift__(self,tgt):returnself.src.next(tgt,self.action)classNode(BaseNode):def__init__(self,max_retries1,wait0):super().__init__();self.max_retries,self.waitmax_retries,waitdefexec_fallback(self,prep_res,exc):raiseexcdef_exec(self,prep_res):forself.cur_retryinrange(self.max_retries):try:returnself.exec(prep_res)exceptExceptionase:ifself.cur_retryself.max_retries-1:returnself.exec_fallback(prep_res,e)ifself.wait0:time.sleep(self.wait)classBatchNode(Node):def_exec(self,items):return[super(BatchNode,self)._exec(i)foriin(itemsor[])]classFlow(BaseNode):def__init__(self,startNone):super().__init__();self.start_nodestartdefstart(self,start):self.start_nodestart;returnstartdefget_next_node(self,curr,action):nxtcurr.successors.get(actionordefault)ifnotnxtandcurr.successors:warnings.warn(fFlow ends: {action} not found in{list(curr.successors)})returnnxtdef_orch(self,shared,paramsNone):curr,p,last_actioncopy.copy(self.start_node),(paramsor{**self.params}),Nonewhilecurr:curr.set_params(p);last_actioncurr._run(shared);currcopy.copy(self.get_next_node(curr,last_action))returnlast_actiondef_run(self,shared):pself.prep(shared);oself._orch(shared);returnself.post(shared,p,o)defpost(self,shared,prep_res,exec_res):returnexec_resclassBatchFlow(Flow):def_run(self,shared):prself.prep(shared)or[]forbpinpr:self._orch(shared,{**self.params,**bp})returnself.post(shared,pr,None)classAsyncNode(Node):asyncdefprep_async(self,shared):passasyncdefexec_async(self,prep_res):passasyncdefexec_fallback_async(self,prep_res,exc):raiseexcasyncdefpost_async(self,shared,prep_res,exec_res):passasyncdef_exec(self,prep_res):forself.cur_retryinrange(self.max_retries):try:returnawaitself.exec_async(prep_res)exceptExceptionase:ifself.cur_retryself.max_retries-1:returnawaitself.exec_fallback_async(prep_res,e)ifself.wait0:awaitasyncio.sleep(self.wait)asyncdefrun_async(self,shared):ifself.successors:warnings.warn(Node wont run successors. Use AsyncFlow.)returnawaitself._run_async(shared)asyncdef_run_async(self,shared):pawaitself.prep_async(shared);eawaitself._exec(p);returnawaitself.post_async(shared,p,e)def_run(self,shared):raiseRuntimeError(Use run_async.)classAsyncBatchNode(AsyncNode,BatchNode):asyncdef_exec(self,items):return[awaitsuper(AsyncBatchNode,self)._exec(i)foriinitems]classAsyncParallelBatchNode(AsyncNode,BatchNode):asyncdef_exec(self,items):returnawaitasyncio.gather(*(super(AsyncParallelBatchNode,self)._exec(i)foriinitems))classAsyncFlow(Flow,AsyncNode):asyncdef_orch_async(self,shared,paramsNone):curr,p,last_actioncopy.copy(self.start_node),(paramsor{**self.params}),Nonewhilecurr:curr.set_params(p);last_actionawaitcurr._run_async(shared)ifisinstance(curr,AsyncNode)elsecurr._run(shared);currcopy.copy(self.get_next_node(curr,last_action))returnlast_actionasyncdef_run_async(self,shared):pawaitself.prep_async(shared);oawaitself._orch_async(shared);returnawaitself.post_async(shared,p,o)asyncdefpost_async(self,shared,prep_res,exec_res):returnexec_resclassAsyncBatchFlow(AsyncFlow,BatchFlow):asyncdef_run_async(self,shared):prawaitself.prep_async(shared)or[]forbpinpr:awaitself._orch_async(shared,{**self.params,**bp})returnawaitself.post_async(shared,pr,None)classAsyncParallelBatchFlow(AsyncFlow,BatchFlow):asyncdef_run_async(self,shared):prawaitself.prep_async(shared)or[]awaitasyncio.gather(*(self._orch_async(shared,{**self.params,**bp})forbpinpr))returnawaitself.post_async(shared,pr,None)核心代码一句话总概括这段代码 一个造工作流的轮子你可以把它理解成流水线搭建工具你写一个个小任务节点用把它们串起来框架自动帮你按顺序执行、重试、批量处理、异步并行它就是一个流程自动化引擎。看懂 3 个核心概念看懂就全懂了1. 节点 Node 一个任务比如查数据库发请求算个数读文件一个节点 干一件事。2. 流程 Flow 一串节点节点1执行完 → 自动执行节点2 → 再执行节点33. 执行生命周期所有节点都一样所有节点都固定分 3 步跑prep准备比如拿参数exec真正干活核心代码post收尾比如存结果逐行超通俗解释1.BaseNode基类所有节点的祖宗classBaseNode:def__init__(self):self.params{}# 节点参数self.successors{}# 下一个要执行谁defnext(self,node):把当前节点 → 指向下一个节点 比如 A.next(B)就是 A 执行完跑 Bdefprep(self):准备defexec(self):干活defpost(self):收尾defrun(self):执行 prep →exec→ post作用定义所有节点必须有的结构。2.Node普通同步节点最常用classNode(BaseNode):def__init__(self,max_retries1,wait0):最大重试次数 重试等待时间自带失败重试失败了会自动重试 N 次。3.BatchNode批量节点给它一个列表它会把列表里每一条都执行一遍。4.Flow流程最关键classFlow(BaseNode):它的作用自动跑一串节点 从起点开始 → 跑完一个自动跑下一个 → 直到结束比如A B CFlow 会自动跑A → B → C5. 条件跳转超级好用A-successB A-failC意思A 返回 “success” → 跑 BA 返回 “fail” → 跑 C这就是流程分支判断异步部分超简单理解所有带Async的类 异步版本同步一件事做完再做下一件异步多件事同时跑不等待类名规律AsyncNode异步节点AsyncFlow异步流程AsyncBatchNode异步批量AsyncParallelBatchNode异步并行同时跑一堆任务最关键的执行逻辑Flow 怎么跑1. 从 start 节点开始 2. 执行节点 3. 看节点返回什么 4. 跳转到对应的下一个节点 5. 直到没有下一个节点结束完全自动化不用你写循环和判断用一个超级小例子# 定义节点1classA(Node):defexec(self,x):print(我是A)returnnext# 定义节点2classB(Node):defexec(self,x):print(我是B)# 搭建流程flowFlow()aA()bB()a-nextb# A 返回 next 就执行 Bflow.start(a)# 运行flow.run({})输出我是A 我是B框架自动执行这个框架到底能干嘛你可以用它做自动化流程数据处理管道接口自动化批量任务异步并发任务带重试、带分支的工作流最简总结这段代码就是一个轻量级流程编排引擎节点 任务流程 任务连线 执行顺序支持重试、批量、异步、分支判断你只需要写任务框架帮你调度执行。基于核心代码可根据需求拓展多智能体、agent-skill等等多方能力。