项目05-手搓Agent之任务通信+任务编排的实现

张开发
2026/4/27 4:13:02 15 分钟阅读

分享文章

项目05-手搓Agent之任务通信+任务编排的实现
核心架构通讯层TeamCollaboration→ 邮箱、发消息、日志、任务状态管理调度层DAGTaskOrchestrator→ 控制任务依赖、执行顺序执行层AgentTeam→ 智能体异步执行任务顶层入口defdemo_complete_workflow():完整演示团队通信 DAG编排 智能体执行print( 完整流程演示DAG 团队协作 )clientOpenAI()# 初始化三大层collaborationTeamCollaboration()# 团队通信层teamAgentTeam(client)# 任务执行逻辑层team.start()orchestrationDAGOrchestration(team,collaboration)# 任务编排层# 初始化邮箱collaboration.init_agent_mailbox(coder)collaboration.init_agent_mailbox(tester)# 核心创建带依赖的任务 print(\n[1] 创建依赖任务链...)t1orchestration.add_task(分析登录需求,assigned_tocoder)t2orchestration.add_task(开发登录功能,assigned_tocoder,deps[t1])t3orchestration.add_task(测试登录功能,assigned_totester,deps[t2])# 查看执行顺序print(f✅ 任务执行顺序{orchestration.get_order()})print(f✅ 当前可执行任务{orchestration.get_ready()})# 模拟任务完成自动解锁下一个任务print(\n[2] 模拟执行任务...)orchestration.finish_task(t1)print(f完成 t1 → 可执行任务{orchestration.get_ready()})orchestration.finish_task(t2)print(f完成 t2 → 可执行任务{orchestration.get_ready()})orchestration.finish_task(t3)print(所有任务执行完成)# 统计信息print(f\n 协作统计总消息数 {len(collaboration.collaboration_log)})# 运行完整演示if__name____main__:demo_complete_workflow()第一步初始化通讯层TeamCollaboration、任务分配层DAGOrchestration、Agent 执行层AgentTeam第二步初始化邮箱用于通信第三步创建带依赖的任务第四步Agent 完成任务 完整流程演示DAG 团队协作 [1] 创建依赖任务链... ✅ 任务执行顺序[t1, t2, t3] ✅ 当前可执行任务[t1] [2] 模拟执行任务... 完成 t1 → 可执行任务[t2] 完成 t2 → 可执行任务[t3] 所有任务执行完成 协作统计总消息数 31 任务编排器功能将【团队协作】、【DAG依赖】、【智能体执行】三层完全打通职责通过通信层创建带依赖的协作任务通过任务编排层管理任务执行顺序通过通信层自动发送消息通知智能体通过执行层任务完成后同步更新状态与依赖# DAG编排器调度依赖协作融合classDAGOrchestration:def__init__(self,team:AgentTeam,collaboration:TeamCollaboration):self.teamteam# Agent 执行层self.collabcollaboration# 通信层self.dagDAGScheduler()# 任务调度层self.pending_tasks{}# ✅ 所有任务缓存不直接进队列defadd_task(self,content:str,assigned_to:str,deps:Optional[List[str]]None)-str: 添加一个带依赖、带分配、带消息通知的完整 DAG 任务最核心方法 :param content: 任务具体内容字符串例如开发登录页面 :param assigned_to: 任务分配给哪个智能体例如coder / tester :param deps: 依赖的任务 ID 列表代表必须等这些任务完成才能执行 默认为 None 无依赖 :return: 返回创建好的唯一任务 ID # 1. 团队协作创建任务 记录状态task_idself.collab.create_task(content,assigned_to,deps)# 2. DAG添加依赖关系self.dag.add_task(task_id,deps)# 3. 缓存任务所有任务先放这里不直接执行候选室self.pending_tasks[task_id]{id:task_id,content:content,assigned_to:assigned_to}# 4. 通信主管发消息通知智能体self.collab.send_message(lead,assigned_to,f已安排任务{content})returntask_iddefsubmit_ready_tasks(self): ✅【核心】提交当前【依赖已完成】的任务到执行队列 自动过滤未就绪任务 → 真正实现 DAG 阻塞执行 ready_task_idsself.dag.get_ready_tasks()# 可执行任务的 idsfortask_idinready_task_ids:# 遍历iftask_idinself.pending_tasks:# 从候选室中挑选出对应的 id 执行# 取出任务并提交执行taskself.pending_tasks.pop(task_id)self.team.submit(task)print(f 启动任务{task_id}→{task[content]})deffinish_task(self,task_id:str): ✅ 标记完成 → 自动解锁下一批任务 → 自动提交执行 这才是完整 DAG # 1. 标记状态完成self.dag.mark_complete(task_id)# 2. ✅ 自动提交新解锁的可执行任务核心self.submit_ready_tasks()defget_order(self):returnself.dag.topological_sort()defget_ready(self):returnself.dag.get_ready_tasks()DAG 依赖类classDAGScheduler: DAG 任务调度器核心依赖管理层 功能基于有向无环图DAG管理任务的依赖关系、执行顺序、可执行状态 作用 1. 添加任务与依赖 2. 判断哪些任务可以执行依赖全部完成 3. 标记任务完成 4. 拓扑排序获取全局执行顺序 def__init__(self):初始化 DAG 调度器# 存储所有任务key任务IDvalue{deps: [依赖任务ID列表]}self.tasks{}# 存储已完成的任务ID集合用于快速判断self.completedset()defadd_task(self,task_id:str,dependencies:List[str]None): 添加任务及其依赖 :param task_id: 任务唯一ID :param dependencies: 该任务依赖的任务ID列表默认为无依赖 self.tasks[task_id]{deps:dependenciesor[]}defget_ready_tasks(self)-List[str]: 获取【可执行任务】列表 规则 1. 任务未被标记为完成 2. 该任务的**所有依赖任务都已经完成** :return: 符合条件的任务ID列表 ready[]# 遍历所有任务检查是否满足执行条件fortid,infoinself.tasks.items():# 任务未完成 所有依赖都已完成iftidnotinself.completedandall(depinself.completedfordepininfo[deps]):ready.append(tid)returnreadydefmark_complete(self,task_id:str): ✅ 标记任务为已完成 会自动解锁依赖它的后续任务 :param task_id: 要标记完成的任务ID self.completed.add(task_id)deftopological_sort(self)-List[str]: 拓扑排序DFS 实现 作用根据依赖关系生成**合法的任务执行顺序** 例如t1 → t2 → t3 :return: 按执行顺序排列的任务ID列表 visitedset()# 记录已访问节点res[]# 存储排序结果# 深度优先遍历defdfs(n):ifninvisited:returnvisited.add(n)# 先递归处理所有依赖任务fordepinself.tasks[n][deps]:dfs(dep)# 依赖处理完再加入自身res.append(n)# 对所有任务执行 DFSfortaskinself.tasks:dfs(task)returnres2 通信层任务状态层核心功能智能体之间的消息通信邮箱系统协作日志持久化保存到 JSON 文件任务状态管理待办 / 已完成任务分配与消息通知classTeamCollaboration: 团队协作核心类通信层 任务状态层 核心功能 1. 智能体之间的消息通信邮箱系统 2. 协作日志持久化保存到 JSON 文件 3. 任务状态管理待办 / 已完成 4. 任务分配与消息通知 与 DAG 关系只负责通信和状态不控制执行顺序 def__init__(self): 初始化团队协作系统 包含邮箱系统、任务管理器、日志系统、文件存储 # 邮箱系统key 智能体名称value Mailbox 实例self.mailbox_system{}# 协作日志列表记录所有消息发送记录self.collaboration_log[]# 团队数据存储目录self.team_dir.team# 协作日志保存路径self.log_fileos.path.join(self.team_dir,collaboration_log.json)# 创建目录不存在则创建存在不报错os.makedirs(self.team_dir,exist_okTrue)# 启动时加载历史日志self._load_logs()def_load_logs(self): 私有方法从文件加载历史协作日志 如果日志文件存在则读取不存在则保持空列表 ifos.path.exists(self.log_file):withopen(self.log_file,encodingutf-8)asf:self.collaboration_logjson.load(f)def_save_logs(self): 私有方法保存当前协作日志到 JSON 文件 每次发送消息后自动调用实现持久化 withopen(self.log_file,w,encodingutf-8)asf:json.dump(self.collaboration_log,f,indent2,ensure_asciiFalse)definit_agent_mailbox(self,agent_name:str): ✉️ 初始化指定智能体的邮箱 每个智能体必须拥有邮箱才能接收消息 :param agent_name: 智能体名称如 coder / tester self.mailbox_system[agent_name]Mailbox(agent_name)defsend_message(self,from_agent:str,to_agent:str,content:str,priority5)-str: 智能体之间发送消息核心通信方法 自动创建邮箱、记录日志、持久化到文件 :param from_agent: 发送方智能体名称如 lead :param to_agent: 接收方智能体名称如 coder :param content: 消息内容任务/通知/指令 :param priority: 消息优先级 1-10默认 5 :return: 生成的唯一消息 ID # 如果接收方没有邮箱自动初始化ifto_agentnotinself.mailbox_system:self.init_agent_mailbox(to_agent)# 向目标邮箱推送消息获取消息IDmsg_idself.mailbox_system[to_agent].push(from_agentfrom_agent,contentcontent,prioritypriority)# 构造日志结构时间 发送方 接收方 消息IDlog{time:datetime.now().isoformat(),from:from_agent,to:to_agent,msg_id:msg_id}# 加入日志并保存self.collaboration_log.append(log)self._save_logs()returnmsg_iddefcreate_task(self,description:str,assigned_to:strNone,priority:int5,dependencies:List[str]None)-str:✨ 创建新任务task_idftask_{uuid.uuid4().hex[:8]}task{id:task_id,description:description,status:TaskStatus.PENDING.value,priority:priority,# 1-10assigned_to:assigned_to,# None any agent can pickdependencies:dependenciesor[],created_at:datetime.now().isoformat(),updated_at:datetime.now().isoformat(),result:None,error:None}self._tasks[task_id]task# 保存任务详情self._save_task_file(task_id,task)self._save_index()returntask_id核心这里的TeamCollaboration本质上已经具备了 1邮箱初始化、2任务创建、3发送消息、4团队消息持久化的功能3 执行层作用启动线程、消费队列、真正执行任务classAgentTeam: 智能体团队执行类任务执行层 核心功能 1. 管理多个子智能体编码员、测试员等 2. 基于生产者-消费者模型实现异步任务执行 3. 多线程并发处理任务提高执行效率 4. 接收并执行来自 DAG 编排器的任务 定位只负责执行任务不管理通信、不管理依赖顺序 def__init__(self,client,modelgpt-4o): 初始化智能体团队 :param client: LLM 客户端实例OpenAI 或其他模型客户端 :param model: 使用的大模型名称默认为 gpt-4o # LLM 模型客户端用于智能体调用 AI 能力self.clientclient# 使用的大模型版本/名称self.modelmodel# 主管智能体负责统筹、分配任务不执行具体工作self.leadLeadAgent(lead,model,client)# 子智能体集群真正执行具体任务的角色self.sub_agents{coder:SubAgent(coder,coding,model,client),# 编码智能体tester:SubAgent(tester,testing,model,client),# 测试智能体}# 任务队列生产者-消费者模型核心存储待执行的任务self.task_queuequeue.Queue()# 运行中的智能体线程字典key智能体名称value线程实例self.running_agents{}defstart(self): 启动智能体团队工作线程 为每个子智能体创建一个守护线程开始循环监听并执行任务 主线程退出时工作线程自动退出 # 遍历所有子智能体为每个创建工作线程forname,agentinself.sub_agents.items():# 创建守护线程执行 _worker 循环方法tthreading.Thread(targetself._worker,args(agent,),daemonTrue)# 启动线程t.start()# 记录线程方便管理/查看状态self.running_agents[name]tdef_worker(self,agent:SubAgent): 智能体工作循环消费者核心逻辑 私有方法持续从任务队列获取任务并执行 :param agent: 当前线程绑定的子智能体实例 # 无限循环持续监听任务whileTrue:try:# 从队列获取任务超时 2 秒避免无限阻塞# 超时后会抛出 queue.Empty 异常重新循环taskself.task_queue.get(timeout2)# 调用智能体的方法执行当前任务agent.process_task(task)# 标记任务完成队列内部计数 -1表示任务已处理完毕self.task_queue.task_done()exceptqueue.Empty:# 队列为空无任务可执行继续循环等待新任务continuedefsubmit(self,task:Dict): 提交任务到团队队列生产者方法 将任务放入队列等待空闲智能体获取并执行 :param task: 任务字典必须包含 id 和 content 等信息 self.task_queue.put(task)

更多文章