第10课:团队协议——状态机驱动的协商
队友能干活能通信,但缺结构化协调。用request-response模式+共享FSM,让关机有握手、计划有审批,一个状态机套两种协议。
系列导读
这是**《12课拆解Claude Code架构》系列的第 10 课,也是阶段四(多 Agent 协作)的核心机制课**。
前九课我们造了一个多 Agent 团队:第 1 课建了 Agent Loop,第 2 课加了工具注册,第 3 课引入规划,第 4 课用 Subagent 隔离上下文,第 5 课加载技能,第 6 课压缩上下文,第 7 课实现任务持久化,第 8 课后台执行,第 9 课组建了团队——队友能干活、能通信。
但有一个关键问题:队友之间没有结构化协调。
第 10 课的格言:
"队友之间要有统一的沟通规矩"
这一课,我们用 request-response 模式 + 共享状态机,给团队装上两个协议——关机握手和计划审批。
无协议的混乱
场景一:关机——直接杀线程
领导 Agent 判断任务完成,要关闭队友。s09 的做法:
领导: 任务完成 → 直接终止 coder 线程
↓
coder 正在写 auth_service.py(写到一半)
↓
文件损坏。配置更新到一半。锁没释放。线程被杀的瞬间,没有任何清理机会。 写了一半的文件、过期的配置、未释放的资源——全部留在磁盘上。
场景二:计划审批——领导说什么就干什么
领导: "重构认证模块"
↓
coder 收到指令 → 立刻开始重构
↓
改了 12 个文件,删了旧接口,数据库迁移跑了一半
↓
领导: "等等,我还没看过你打算怎么改……"高风险变更——重构核心模块、数据库迁移、删除 API——应该先过审再执行。但 s09 没有审批机制,队友收到指令就开干。
根本问题
不是能力不够,是缺沟通规矩。两个人一起搬桌子,不喊"一二三"就抬,一定有人闪腰。

双协议架构
解决方案出奇地对称:两个协议共享同一个状态机。
关机协议 计划审批协议
┌──────────────────┐ ┌──────────────────┐
│ 领导 → 队友 │ │ 队友 → 领导 │
│ shutdown_request │ │ plan_request │
└────────┬─────────┘ └────────┬─────────┘
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ pending │ │ pending │
│ (共享 FSM) │ │ (共享 FSM) │
└───────┬──┬──────┘ └───────┬──┬──────┘
/ \ / \
▼ ▼ ▼ ▼
approved rejected approved rejected
(安全退出) (继续运行) (开始执行) (修改计划)方向不同,结构相同:
- 关机协议:领导发请求 → 队友决定 approve/reject
- 计划审批:队友发请求 → 领导决定 approve/reject
底层是同一个 FSM:pending → approved | rejected。
核心拆解
一、共享状态机
所有请求-响应协议的核心就是三个状态:
from enum import Enum
from dataclasses import dataclass, field
import uuid
import time
class RequestStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
@dataclass
class ProtocolRequest:
req_id: str
req_type: str # "shutdown" | "plan"
from_agent: str # 发起方
to_agent: str # 接收方
payload: dict # 请求内容
status: RequestStatus = RequestStatus.PENDING
response_payload: dict = field(default_factory=dict)
created_at: float = field(default_factory=time.time)一个 req_id 关联整个请求-响应生命周期。不管是关机还是计划审批,数据结构完全一样。
二、请求追踪器
class ProtocolTracker:
def __init__(self):
self._requests: dict[str, ProtocolRequest] = {}
def create_request(self, req_type, from_agent, to_agent, payload) -> str:
req_id = f"{req_type}_{uuid.uuid4().hex[:8]}"
self._requests[req_id] = ProtocolRequest(
req_id=req_id, req_type=req_type,
from_agent=from_agent, to_agent=to_agent, payload=payload,
)
return req_id
def respond(self, req_id, status, response_payload=None):
req = self._requests[req_id]
if req.status != RequestStatus.PENDING:
raise ValueError(f"Request {req_id} already resolved") # 状态守卫
req.status = status
req.response_payload = response_payload or {}
def get_pending(self, agent_id: str) -> list[ProtocolRequest]:
return [r for r in self._requests.values()
if r.to_agent == agent_id and r.status == RequestStatus.PENDING]关键设计:respond() 里的 if req.status != PENDING 是状态守卫——已决议的请求不能再改,防止并发双重响应。
三、关机协议
领导想关闭队友,不再直接杀线程,而是发请求等回复:
class TeamCoordinator:
def __init__(self):
self.tracker = ProtocolTracker()
self.agents: dict[str, AgentThread] = {}
def request_shutdown(self, target_agent: str) -> str:
req_id = self.tracker.create_request(
req_type="shutdown", from_agent="leader",
to_agent=target_agent, payload={"reason": "task_complete"},
)
self.agents[target_agent].notify(f"Shutdown requested: {req_id}")
return req_id
def handle_shutdown_request(coordinator, agent_id, req_id) -> str:
agent = coordinator.agents[agent_id]
if agent.has_uncommitted_writes():
coordinator.tracker.respond(
req_id, RequestStatus.REJECTED,
{"reason": "uncommitted_writes", "files": agent.pending_files()},
)
return f"Rejected: {len(agent.pending_files())} files pending"
agent.flush_buffers()
coordinator.tracker.respond(req_id, RequestStatus.APPROVED)
return "Shutdown approved, buffers flushed"从"直接杀"变成了"请求 → 检查 → 清理 → 回复"。 队友有机会把文件写完、缓冲区刷盘,然后才安全退出。
四、计划审批协议
方向反过来——队友提交计划,领导审批:
def submit_plan_for_review(coordinator, agent_id, plan) -> str:
return coordinator.tracker.create_request(
req_type="plan", from_agent=agent_id, to_agent="leader",
payload={"plan_summary": plan["summary"], "affected_files": plan["files"],
"risk_level": plan["risk"], "estimated_changes": plan["change_count"]},
)
def handle_plan_review(coordinator, req_id) -> str:
plan = coordinator.tracker.get_request(req_id).payload
if plan["risk_level"] == "high":
coordinator.tracker.respond(
req_id, RequestStatus.REJECTED,
{"reason": "high_risk", "suggestion": "Break into smaller changes"},
)
return "Plan rejected: high risk, suggest incremental approach"
coordinator.tracker.respond(req_id, RequestStatus.APPROVED)
return "Plan approved, proceed with execution"队友拿到 APPROVED 才开始执行;拿到 REJECTED 就调整计划重新提交。高风险变更不再"收到就干"。
集成到循环
协议不改循环结构,只在工具调度层增加两个工具——submit_plan 和 respond_to_request:
def dispatch_tool(name: str, input_data: dict) -> str:
if name == "submit_plan":
return submit_plan_for_review(coordinator, current_agent_id, input_data)
if name == "respond_to_request":
status = (RequestStatus.APPROVED if input_data["decision"] == "approve"
else RequestStatus.REJECTED)
coordinator.tracker.respond(
input_data["req_id"], status,
{"reason": input_data.get("reason", "")},
)
return f"Request {input_data['req_id']}: {status.value}"
return TOOL_HANDLERS[name](**input_data)每轮循环开始前,检查待处理请求并以 <pending-requests> 标签注入,让模型明确区分这是协议事件而非用户输入:
def inject_pending_requests(messages: list, agent_id: str):
pending = coordinator.tracker.get_pending(agent_id)
if not pending:
return
parts = ["<pending-requests>"]
for req in pending:
parts.append(f"[{req.req_id}] type={req.req_type} from={req.from_agent}")
parts.append("</pending-requests>")
messages.append({"role": "user", "content": "\n".join(parts)})
三个设计洞见
洞见一:一个 FSM 两种用途
关机协议和计划审批,方向相反,但底层是同一个状态机:pending → approved | rejected。ProtocolTracker 不关心 req_type 是什么——它只管状态流转。
这意味着你可以零成本添加新协议:资源申请、权限升级、配置变更审批——都是往 create_request 传一个不同的 req_type,其他代码一行不改。
通用协议引擎 > 专用协议硬编码。
洞见二:req_id 的关联性
每个请求都有唯一的 req_id,格式 {type}_{hash}(如 shutdown_a3f8b2c1)。这个 ID 贯穿整个生命周期:
create_request → req_id → 通知队友 → 队友 respond(req_id) → 发起方查询 get_request(req_id)没有 req_id,关机请求和计划请求就混在一起,响应不知道对应哪个请求。在分布式协作中,关联 ID 是最基础的协调原语。
洞见三:状态守卫防止双重响应
respond() 检查 req.status != PENDING——已决议的请求不能再改。这不是防御式编程的洁癖,是并发场景的必需品:
队友 A: respond(req_1, APPROVED) ← 先到
队友 B: respond(req_1, REJECTED) ← 后到 → ValueError如果没有守卫,最后一个响应覆盖前一个,结果不可预测。状态机的价值就在于:让非法状态转换变成编译期/运行期错误,而不是运行时的沉默 bug。
五分钟跑起来
# 进入项目目录
cd learn-claude-code
# 启动第十课
python agents/s10_team_protocols.py任务一:关机协议——请求-响应握手
创建队友后请求关机,观察协议握手过程:
s10 >> Spawn alice as a coder. Then request her shutdown.实际执行记录:
> spawn_teammate: Spawned 'alice' (role: coder)
> shutdown_request:
Shutdown request 3bc64af4 sent to 'alice' (status: pending)
[alice] read_inbox: [{
"type": "shutdown_request",
"from": "lead",
"content": "Please shut down gracefully."
}]
[alice] send_message: Sent message to bob ← 关机前把未完成的事交代给 bob
[alice] shutdown_response: Shutdown approved ← alice 确认可以关机关键观察: 不是直接杀线程——领导发 shutdown_request,alice 收到后先交代工作(给 bob 发消息),然后才回复 shutdown_approved。这是协议的核心价值:给 Agent 时间清理和交接。
验证关机后的状态:
s10 >> List teammates to see alice's status after shutdown approval
> list_teammates:
Team: default
alice (coder): shutdown ← 状态从 idle 变成 shutdown
bob (tester): idle任务二:计划审批协议——提交、审查、批准
让 charlie 提交工作计划,领导审批:
s10 >> Spawn charlie, have him submit a plan, then approve it.实际执行记录:
> spawn_teammate: Spawned 'charlie' (role: developer)
[charlie] 开始自主探索项目...
[charlie] read_file: utils.py, test_utils.py, hello.py ...
[charlie] bash: pytest → test session starts ...
[charlie] bash: stringUtils.jscharlie 花了大量时间探索代码库——读源码、跑测试、分析项目结构。领导在这段时间持续轮询收件箱:
> read_inbox: [] ← 还没提交
> read_inbox: [] ← 还在探索
> read_inbox: [] ← 继续等...
> send_message: Sent message to charlie ← 催一下最终 charlie 提交了计划:
[charlie] plan_approval:
Plan submitted (request_id=37eea607). Waiting for lead approval.
> read_inbox: [{
"type": "plan_approval_response",
"from": "charlie",
"content": "## Plan: Add Unit Tests for Utility Functions\n\n
### Summary\n
- NEW src/utils/stringUtils.test.js (~15 JS test cases)\n
- NEW tests/test_greet.py (~5 Python test cases)\n
- ENHANCE mypackage/tests/test_utils.py (~6 additional edge cases)"
}]
> plan_approval: Plan approved for 'charlie' ← 领导批准关键观察: 整个流程是异步的——charlie 在后台独立工作(探索代码、分析结构),领导持续轮询直到计划到达。计划通过 request_id=37eea607 全程追踪,批准后 charlie 才开始执行。没有这个协议,charlie 可能直接动手改代码,改完才发现方向错了。
总结:你刚造了什么
| 组件 | 之前(s09) | 之后(s10) |
|---|---|---|
| 关机方式 | 直接终止线程 | 请求-响应握手 |
| 计划执行 | 收到就干 | 提交/审查审批 |
| 请求关联 | 无 | 每个请求一个 req_id |
| 状态管理 | 无 | 共享 FSM(pending/approved/rejected) |
| 新增代码 | — | ~100 行(ProtocolTracker + 工具) |
核心原则:一个 FSM,两种用途,N 种扩展。 不需要为每种协议写专门的状态管理——通用的 request-response 引擎处理一切。循环依然一行不改。

下一课预告
第 10 课让队友之间有了协议。但所有 Agent 还在同一个工作目录里操作——一个 Agent 在重构 auth.py,另一个也在改 auth.py,文件冲突随时爆发。
第 11 课:Worktree 隔离 —— 每个 Agent 独立的工作空间。
# 预告:s11 的 Worktree 隔离
worktree = WorktreeManager()
coder_workspace = worktree.create("coder", branch="feat/auth-refactor")
tester_workspace = worktree.create("tester", branch="feat/auth-refactor")
# 两个 Agent 操作同一个分支的不同副本
# 改完后 merge,冲突在 git 层解决Git worktree 给每个 Agent 一个独立的文件系统副本,团队协议在逻辑层协调,worktree 在物理层隔离。
这是《12课拆解Claude Code架构:从零掌握Agent Harness工程》系列的第 10 课。关注Claw开发者,不错过后续更新。
完整代码和交互式学习平台:github.com/shareAI-lab/learn-claude-code
如果这篇文章对你有帮助,欢迎转发给你的技术团队。
系列目录
- 第1课:用20行Python造出你的第一个AI Agent
- 第2课:给Agent加工具 —— dispatch map模式详解
- 第3课:TodoWrite —— 让Agent先想后做:规划系统
- 第4课:Subagent —— 拆解大任务,上下文隔离
- 第5课:按需加载领域知识——Skill机制
- 第6课:无限对话——上下文压缩三层策略
- 第7课:任务持久化——文件级DAG任务图
- 第8课:后台执行——异步任务与通知队列
- 第9课:Agent Teams——多Agent协作:团队与邮箱系统
- 第10课:团队协议——状态机驱动的协商(本文)
- 第11课:自治Agent——自组织任务认领
- 第12课:终极隔离——Worktree并行执行