Harness实战:Agent Teams——从单兵到团队协作
之前的Agent都是单打独斗——一个主Agent加若干一次性子Agent。这篇把Agent变成团队:持久化的命名队友、JSONL收件箱通信、关机握手和计划审批协议。从9个工具扩展到17个,从独狼进化为有编制的团队。
写在前面
上一篇我们给 Agent 加了三层上下文压缩——micro_compact 替换旧结果、auto_compact 摘要归档、compact 工具手动触发。上下文膨胀的问题暂时按住了。
但到目前为止,所有的"多 Agent"模式有一个共同特征——用完即弃。
task 是同步子任务,跑完 sub_messages[] 就丢。parallel_tasks 是一批子任务并行跑完拿结果。background_task 虽然异步,但本质还是一次性的。
这些模式适合"做一件事然后汇报"的场景。但现实中很多任务需要持续协作——一个队友写代码,另一个队友写测试,Lead 协调进度,中间不断沟通。
这就是 Agent Teams 的场景:持久化的命名队友,随时通信,有组织有纪律。
单兵 vs 团队:差在哪?
之前的子 Agent 模式:
Lead: "分析技术栈" → 派出子 Agent → 子 Agent 干完 → 返回结果 → 子 Agent 消失团队模式:
Lead: spawn("alice", "coder", "负责实现功能")
Lead: spawn("bob", "tester", "负责写测试")
Lead → send("alice", "开始写 auth 模块")
Alice → send("lead", "auth 模块写完了")
Lead → send("bob", "alice 写完了,你开始写测试")
Bob → plan_approval("计划: 写3个测试用例...")
Lead → plan_review(approve=True)
Bob → send("lead", "测试全过了")
Lead → shutdown_request("alice")
Lead → shutdown_request("bob")三个本质区别:
| 维度 | 子 Agent | 团队队友 |
|---|---|---|
| 生命周期 | 用完即弃 | 持久化,在线等消息 |
| 通信方式 | 返回值(单向) | 收件箱(双向) |
| 身份 | 匿名 | 有名字、有角色 |
整体架构
改造基于现有的 agentic-demo.py(600 行,9 个工具),目标是 harness-agent-teams.py(904 行,17 个工具)。
新增三个核心模块:
原有 agentic-demo.py (保留全部)
├── Base Tools (5): bash, read_file, write_file, edit_file, compact
├── Subagent Tools (4): task, parallel_tasks, background_task, check_task
├── Context Compact: micro_compact, auto_compact, manual compact
└── Plan Mode
+ 新增
├── MessageBus: JSONL 收件箱通信
├── TeammateManager: 持久化命名队友
└── Protocols: 关机握手 + 计划审批工具总表:
| 分组 | 工具数 | 工具名 |
|---|---|---|
| BASE_TOOLS | 5 | bash, read_file, write_file, edit_file, compact |
| SUBAGENT_TOOLS | 4 | task, parallel_tasks, background_task, check_task |
| TEAM_TOOLS | 5 | spawn_teammate, list_teammates, send_message, read_inbox, broadcast |
| PROTOCOL_TOOLS | 3 | shutdown_request, shutdown_status, plan_review |
下面按改造步骤逐一拆解。
第一步:MessageBus——JSONL 收件箱
Agent 之间需要通信。最简单的方式:每个 Agent 一个文件,追加写入,读完清空。
.team/
inbox/
alice.jsonl ← Alice 的收件箱
bob.jsonl ← Bob 的收件箱
lead.jsonl ← Lead 的收件箱实现:
class MessageBus:
"""每个队友一个 JSONL 文件,append-only 写入,drain-on-read。"""
def __init__(self, inbox_dir: Path):
self.dir = inbox_dir
self.dir.mkdir(parents=True, exist_ok=True)
def send(self, sender: str, to: str, content: str,
msg_type: str = "message", extra: dict = None) -> str:
msg = {
"type": msg_type,
"from": sender,
"content": content,
"timestamp": time.time(),
}
if extra:
msg.update(extra)
inbox_path = self.dir / f"{to}.jsonl"
with open(inbox_path, "a") as f:
f.write(json.dumps(msg) + "\n")
return f"Sent {msg_type} to {to}"
def read_inbox(self, name: str) -> list:
inbox_path = self.dir / f"{name}.jsonl"
if not inbox_path.exists():
return []
messages = []
for line in inbox_path.read_text().strip().splitlines():
if line:
messages.append(json.loads(line))
inbox_path.write_text("") # drain
return messages
def broadcast(self, sender: str, content: str, teammates: list) -> str:
count = 0
for name in teammates:
if name != sender:
self.send(sender, name, content, "broadcast")
count += 1
return f"Broadcast to {count} teammates"两个关键设计:
Append-only 写入 — open(path, "a") 追加模式。多个线程同时往不同文件写不会冲突(每个队友写的是别人的收件箱,自己不写自己的)。
Drain-on-read — 读完立刻清空文件。消息被消费后不留存,避免重复处理。类似 RabbitMQ 的消费确认,但简化到一行 write_text("")。
为什么不用 Python 的 queue.Queue?因为文件有两个优势:
- 可观测 — 随时
cat .team/inbox/alice.jsonl看队友收件箱,方便调试 - 进程安全 — 未来如果队友跑在不同进程甚至不同机器,文件系统天然支持
第二步:TeammateManager——持久化命名队友
子 Agent(run_subagent)是匿名的、用完即弃的。团队队友需要:
- 有名字 — "alice"、"bob",不是 "subagent_1"
- 有角色 — "coder"、"tester",影响系统提示词
- 持久化 — 名册存在
.team/config.json,重启后还在 - 独立线程 — 每个队友在自己的线程中跑 agent loop
class TeammateManager:
def __init__(self, team_dir: Path):
self.dir = team_dir
self.config_path = self.dir / "config.json"
self.config = self._load_config()
self.threads: dict[str, threading.Thread] = {}
def spawn(self, name: str, role: str, prompt: str) -> str:
member = self._find_member(name)
if member:
if member["status"] not in ("idle", "shutdown"):
return f"Error: '{name}' is currently {member['status']}"
member["status"] = "working"
member["role"] = role
else:
member = {"name": name, "role": role, "status": "working"}
self.config["members"].append(member)
self._save_config()
thread = threading.Thread(
target=self._teammate_loop,
args=(name, role, prompt),
daemon=True,
)
self.threads[name] = thread
thread.start()
return f"Spawned '{name}' (role: {role})"config.json 长这样:
{
"team_name": "default",
"members": [
{"name": "alice", "role": "coder", "status": "working"},
{"name": "bob", "role": "tester", "status": "idle"}
]
}队友的 Agent Loop
队友的循环和原来的 run_subagent 结构一样——调 LLM、执行工具、回传结果。但有三个关键差异:
def _teammate_loop(self, name: str, role: str, prompt: str):
sys_prompt = (
f"You are '{name}', role: {role}, at {WORKDIR}. "
f"Submit plans via plan_approval before major work. "
f"Respond to shutdown_request with shutdown_response. "
f"Use send_message to communicate with teammates and lead."
)
messages = [{"role": "user", "content": prompt}]
should_exit = False
for _ in range(50):
# ★ 差异1: 每轮检查收件箱
inbox = BUS.read_inbox(name)
for msg in inbox:
messages.append({"role": "user", "content": json.dumps(msg)})
if should_exit:
break
response = client.messages.create(
model=MODEL,
system=sys_prompt,
messages=messages,
tools=self._teammate_tools(), # ★ 差异2: 队友专属工具集
max_tokens=4096,
)
# ... 执行工具 ...
# ★ 差异3: 关机协议
if block.name == "shutdown_response" and block.input.get("approve"):
should_exit = True三个差异一目了然:
| 对比 | run_subagent | _teammate_loop |
|---|---|---|
| 收件箱 | 无 | 每轮检查 BUS.read_inbox(name) |
| 工具集 | BASE_TOOLS(4个) | 8个(含 send_message, read_inbox, shutdown_response, plan_approval) |
| 退出条件 | stop_reason != tool_use 或达到轮次 | + 批准关机后退出 |
队友的工具集
队友有 8 个工具——基础 4 个 + 通信 2 个 + 协议 2 个:
bash, read_file, write_file, edit_file ← 干活用
send_message, read_inbox ← 通信用
shutdown_response, plan_approval ← 协议用注意:队友没有 task、parallel_tasks 等领导专属工具。队友不能派子任务,只能自己干活和通信。层级分明。
第三步:协议——关机握手 + 计划审批
Agent 不是不可控的。两个协议让 Lead 对队友有控制权。
关机握手协议
Lead Teammate
| |
|--shutdown_req-->| Lead: "请优雅关机"
| {req_id:"abc"} |
|<--shutdown_resp-| Teammate: "好的/再等等"
| {req_id:"abc", |
| approve:true} |Lead 侧:
def handle_shutdown_request(teammate: str) -> str:
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
BUS.send("lead", teammate, "Please shut down gracefully.",
"shutdown_request", {"request_id": req_id})
return f"Shutdown request {req_id} sent to '{teammate}' (status: pending)"队友侧(在 _exec 中):
if tool_name == "shutdown_response":
req_id = args["request_id"]
approve = args["approve"]
with _tracker_lock:
if req_id in shutdown_requests:
shutdown_requests[req_id]["status"] = (
"approved" if approve else "rejected"
)
BUS.send(sender, "lead", args.get("reason", ""),
"shutdown_response",
{"request_id": req_id, "approve": approve})
return f"Shutdown {'approved' if approve else 'rejected'}"关键:队友可以拒绝关机。如果队友手头有未完成的工作,它可以回复 approve: false,Lead 会收到拒绝消息,然后决定是等一等还是强制。
计划审批协议
方向相反——队友主动提交计划,Lead 审批。
Teammate Lead
| |
|--plan_req------>| Teammate: "我打算这样做..."
| {req_id:"xyz"} |
|<--plan_resp-----| Lead: "同意/否决"
| {req_id:"xyz", |
| approve:true} |队友侧:
if tool_name == "plan_approval":
plan_text = args.get("plan", "")
req_id = str(uuid.uuid4())[:8]
with _tracker_lock:
plan_requests[req_id] = {
"from": sender, "plan": plan_text, "status": "pending",
}
BUS.send(sender, "lead", plan_text, "plan_approval_response",
{"request_id": req_id, "plan": plan_text})
return f"Plan submitted (request_id={req_id}). Waiting for lead approval."Lead 侧:
def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str:
with _tracker_lock:
req = plan_requests.get(request_id)
if not req:
return f"Error: Unknown plan request_id '{request_id}'"
with _tracker_lock:
req["status"] = "approved" if approve else "rejected"
BUS.send("lead", req["from"], feedback, "plan_approval_response",
{"request_id": request_id, "approve": approve, "feedback": feedback})
return f"Plan {req['status']} for '{req['from']}'"共享 FSM
两个协议共享同一个状态机:
[pending] ──approve──> [approved]
[pending] ──reject───> [rejected]跟踪器是两个字典 + 一把锁:
shutdown_requests: dict[str, dict] = {} # {req_id: {target, status}}
plan_requests: dict[str, dict] = {} # {req_id: {from, plan, status}}
_tracker_lock = threading.Lock()为什么用 request_id 而不是直接用队友名?因为同一个队友可能有多个并发请求——Lead 发了关机请求,队友还没回复,Lead 又发了一个。request_id 让每个请求独立追踪。
第四步:改造 Agent Loop——Lead 检查收件箱
原来的 agent_loop 每轮只做三件事:micro_compact → 调 LLM → 执行工具。现在在调 LLM 之前加一步——检查收件箱:
def agent_loop(messages: list):
while True:
micro_compact(messages)
if estimate_tokens(messages) > THRESHOLD:
messages[:] = auto_compact(messages)
# ★ 新增: Lead 每轮检查收件箱
inbox = BUS.read_inbox("lead")
if inbox:
messages.append({
"role": "user",
"content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
})
response = client.messages.create(...)
# ... 其余不变只加了 6 行。用 <inbox> 标签包裹,让模型清楚地区分"用户输入"和"队友消息"。
这样 Lead 在每次 LLM 调用前都能看到队友发来的消息——计划提交、关机回复、工作汇报,全部自动注入上下文。
第五步:扩展工具注册
工具定义按功能分组,dispatch map 统一注册:
# 领导 Agent 拥有全部工具
TOOLS = BASE_TOOLS + SUBAGENT_TOOLS + TEAM_TOOLS + PROTOCOL_TOOLS
# 完整 dispatch map
TOOL_HANDLERS = {
# base (5)
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
"compact": lambda **kw: "Manual compression requested.",
# subagent (4)
"task": lambda **kw: run_subagent(kw["prompt"]),
"parallel_tasks": lambda **kw: run_parallel_tasks(kw["tasks"]),
"background_task": lambda **kw: run_background_task(kw["prompt"]),
"check_task": lambda **kw: run_check_task(kw["task_id"]),
# team (5)
"spawn_teammate": lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
"list_teammates": lambda **kw: TEAM.list_all(),
"send_message": lambda **kw: BUS.send("lead", kw["to"], kw["content"], ...),
"read_inbox": lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
"broadcast": lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
# protocols (3)
"shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]),
"shutdown_status": lambda **kw: handle_shutdown_status(kw["request_id"]),
"plan_review": lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], ...),
}还是那个原则——加新能力只加注册,agent_loop 一行不改。dispatch 架构的威力。
第六步:REPL 和优雅退出
新增 REPL 命令
elif query.strip() == "/team":
print(TEAM.list_all())
continue
elif query.strip() == "/inbox":
msgs = BUS.read_inbox("lead")
if msgs:
print(json.dumps(msgs, indent=2, ensure_ascii=False))
else:
print(" (inbox empty)")
continue/team 看队伍状态,/inbox 手动检查收件箱。调试利器。
优雅退出
原来只等后台任务,现在多等一类——活跃的队友线程:
# 优雅退出:等待后台任务 + 关机队友
with bg_lock:
pending = {tid: f for tid, f in bg_tasks.items() if not f.done()}
active_threads = {n: t for n, t in TEAM.threads.items()
if t.is_alive()}
if pending or active_threads:
print(f"⏳ Waiting for {len(pending)} background task(s) "
f"+ {len(active_threads)} teammate(s)...")
for tid, future in pending.items():
future.result(timeout=300)
for name, thread in active_threads.items():
thread.join(timeout=10)第七步:提示词配套改动
System Prompt
从"coding agent"变成"team lead agent",新增团队和协议工具说明:
SYSTEM = f"""You are a team lead agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain.
You have several task delegation strategies:
- `task`: Run a subtask synchronously. Blocks until done, returns result.
- `parallel_tasks`: Run multiple subtasks concurrently with dependency ordering.
- `background_task`: Fire-and-forget. Returns a task_id immediately.
- `check_task`: Poll a background task by task_id.
Team management:
- `spawn_teammate`: Spawn a persistent named teammate.
- `list_teammates`: See all teammates with name, role, status.
- `send_message`: Send a message to a teammate's inbox.
- `read_inbox`: Read and drain the lead's inbox.
- `broadcast`: Send a message to all teammates.
Protocols:
- `shutdown_request`: Request a teammate to shut down gracefully.
- `shutdown_status`: Check shutdown request status.
- `plan_review`: Approve or reject a teammate's plan."""执行提示词
在 EXECUTE_PROMPT 中加一句:
"Use `spawn_teammate` for persistent agents that need ongoing communication. "让模型在执行阶段知道什么时候用队友而不是一次性子任务。
改造总结
| 维度 | agentic-demo.py | harness-agent-teams.py |
|---|---|---|
| 总行数 | 600 | 904(+304) |
| 领导工具 | 9 | 17(+8) |
| 队友工具 | 无 | 8 |
| 新增类 | 无 | MessageBus + TeammateManager |
| 通信机制 | 无 | JSONL 收件箱 |
| 协议 | 无 | 关机握手 + 计划审批 |
| REPL 命令 | /plan, /compact, q | +/team, /inbox |
| 退出等待 | 后台任务 | 后台任务 + 队友线程 |
改动结构:
| 步骤 | 改动内容 | 新增行数 |
|---|---|---|
| 1 | 团队基础设施(目录 + 消息类型 + 协议跟踪器) | ~15 |
| 2 | MessageBus 类 | ~45 |
| 3 | TeammateManager 类(含队友 agent loop + 工具集) | ~170 |
| 4 | 协议处理函数(shutdown + plan review) | ~30 |
| 5 | 扩展工具定义 + dispatch map | ~55 |
| 6 | agent_loop 新增收件箱检查 | ~6 |
| 7 | REPL 命令 + 优雅退出 + 提示词 | ~20 |
与 Claude Code 的对应
Claude Code 的 Agent 工具在官方文档中也支持类似的团队模式:
- spawn_teammate ≈ Claude Code 的
Agenttool(启动子 Agent) - send_message / read_inbox ≈ Claude Code 的
SendMessage(向运行中的 Agent 发消息,恢复其完整上下文) - plan_review ≈ Claude Code 的 Plan Mode(Agent 提出方案,用户确认后执行)
底层原理一样——持久化的 Agent 实例 + 消息传递 + 协议控制。
下一步
Agent 有了团队,但还缺一个关键能力——自主循环。
当前的 Agent 还是"一问一答"模式:用户提问,Agent 执行,等下一个问题。真正的自主 Agent 应该能自己规划、自己执行、自己评估,持续循环直到任务完成或遇到需要人工决策的节点。
下一篇,我们让 Agent 从"等指令"进化为"自驱动"。
Harness实战:上下文压缩——三层策略让Agent永不断档
Agent工作越久,messages越胖。一个1000行的cat输出占4000 token,模型早就看过了,后续每轮还在为它付费。三层压缩策略——micro_compact静默替换旧结果、auto_compact在token超阈值时LLM摘要、compact工具让模型主动触发——让上下文永远可控。
Harness实战:Autonomous Agents——从等指令到自驱动
上一篇 Agent 有了团队,但队友还是"被动接活"。这篇让队友变成自治单元:任务看板 + WORK/IDLE 双阶段循环 + 自动认领 + 身份重注入。队友做完手头活,自己去看板找新任务,60秒没活自动关机。从17个工具扩展到19个,从"等指令"进化为"自驱动"。