HuanCode Docs

Harness实战:Agent Teams——从单兵到团队协作

之前的Agent都是单打独斗——一个主Agent加若干一次性子Agent。这篇把Agent变成团队:持久化的命名队友、JSONL收件箱通信、关机握手和计划审批协议。从9个工具扩展到17个,从独狼进化为有编制的团队。

写在前面

上一篇我们给 Agent 加了三层上下文压缩——micro_compact 替换旧结果、auto_compact 摘要归档、compact 工具手动触发。上下文膨胀的问题暂时按住了。

但到目前为止,所有的"多 Agent"模式有一个共同特征——用完即弃

task 是同步子任务,跑完 sub_messages[] 就丢。parallel_tasks 是一批子任务并行跑完拿结果。background_task 虽然异步,但本质还是一次性的。

这些模式适合"做一件事然后汇报"的场景。但现实中很多任务需要持续协作——一个队友写代码,另一个队友写测试,Lead 协调进度,中间不断沟通。

这就是 Agent Teams 的场景:持久化的命名队友,随时通信,有组织有纪律


单兵 vs 团队:差在哪?

之前的子 Agent 模式:

Lead: "分析技术栈" → 派出子 Agent → 子 Agent 干完 → 返回结果 → 子 Agent 消失

团队模式:

Lead: spawn("alice", "coder", "负责实现功能")
Lead: spawn("bob", "tester", "负责写测试")

Lead → send("alice", "开始写 auth 模块")
Alice → send("lead", "auth 模块写完了")
Lead → send("bob", "alice 写完了,你开始写测试")
Bob → plan_approval("计划: 写3个测试用例...")
Lead → plan_review(approve=True)
Bob → send("lead", "测试全过了")
Lead → shutdown_request("alice")
Lead → shutdown_request("bob")

三个本质区别:

维度子 Agent团队队友
生命周期用完即弃持久化,在线等消息
通信方式返回值(单向)收件箱(双向)
身份匿名有名字、有角色

整体架构

改造基于现有的 agentic-demo.py(600 行,9 个工具),目标是 harness-agent-teams.py(904 行,17 个工具)。

新增三个核心模块:

原有 agentic-demo.py (保留全部)
├── Base Tools (5): bash, read_file, write_file, edit_file, compact
├── Subagent Tools (4): task, parallel_tasks, background_task, check_task
├── Context Compact: micro_compact, auto_compact, manual compact
└── Plan Mode

+ 新增
├── MessageBus: JSONL 收件箱通信
├── TeammateManager: 持久化命名队友
└── Protocols: 关机握手 + 计划审批

工具总表:

分组工具数工具名
BASE_TOOLS5bash, read_file, write_file, edit_file, compact
SUBAGENT_TOOLS4task, parallel_tasks, background_task, check_task
TEAM_TOOLS5spawn_teammate, list_teammates, send_message, read_inbox, broadcast
PROTOCOL_TOOLS3shutdown_request, shutdown_status, plan_review

下面按改造步骤逐一拆解。


第一步:MessageBus——JSONL 收件箱

Agent 之间需要通信。最简单的方式:每个 Agent 一个文件,追加写入,读完清空

.team/
  inbox/
    alice.jsonl    ← Alice 的收件箱
    bob.jsonl      ← Bob 的收件箱
    lead.jsonl     ← Lead 的收件箱

实现:

class MessageBus:
    """每个队友一个 JSONL 文件,append-only 写入,drain-on-read。"""

    def __init__(self, inbox_dir: Path):
        self.dir = inbox_dir
        self.dir.mkdir(parents=True, exist_ok=True)

    def send(self, sender: str, to: str, content: str,
             msg_type: str = "message", extra: dict = None) -> str:
        msg = {
            "type": msg_type,
            "from": sender,
            "content": content,
            "timestamp": time.time(),
        }
        if extra:
            msg.update(extra)
        inbox_path = self.dir / f"{to}.jsonl"
        with open(inbox_path, "a") as f:
            f.write(json.dumps(msg) + "\n")
        return f"Sent {msg_type} to {to}"

    def read_inbox(self, name: str) -> list:
        inbox_path = self.dir / f"{name}.jsonl"
        if not inbox_path.exists():
            return []
        messages = []
        for line in inbox_path.read_text().strip().splitlines():
            if line:
                messages.append(json.loads(line))
        inbox_path.write_text("")  # drain
        return messages

    def broadcast(self, sender: str, content: str, teammates: list) -> str:
        count = 0
        for name in teammates:
            if name != sender:
                self.send(sender, name, content, "broadcast")
                count += 1
        return f"Broadcast to {count} teammates"

两个关键设计:

Append-only 写入open(path, "a") 追加模式。多个线程同时往不同文件写不会冲突(每个队友写的是别人的收件箱,自己不写自己的)。

Drain-on-read — 读完立刻清空文件。消息被消费后不留存,避免重复处理。类似 RabbitMQ 的消费确认,但简化到一行 write_text("")

为什么不用 Python 的 queue.Queue?因为文件有两个优势:

  1. 可观测 — 随时 cat .team/inbox/alice.jsonl 看队友收件箱,方便调试
  2. 进程安全 — 未来如果队友跑在不同进程甚至不同机器,文件系统天然支持

第二步:TeammateManager——持久化命名队友

子 Agent(run_subagent)是匿名的、用完即弃的。团队队友需要:

  • 有名字 — "alice"、"bob",不是 "subagent_1"
  • 有角色 — "coder"、"tester",影响系统提示词
  • 持久化 — 名册存在 .team/config.json,重启后还在
  • 独立线程 — 每个队友在自己的线程中跑 agent loop
class TeammateManager:
    def __init__(self, team_dir: Path):
        self.dir = team_dir
        self.config_path = self.dir / "config.json"
        self.config = self._load_config()
        self.threads: dict[str, threading.Thread] = {}

    def spawn(self, name: str, role: str, prompt: str) -> str:
        member = self._find_member(name)
        if member:
            if member["status"] not in ("idle", "shutdown"):
                return f"Error: '{name}' is currently {member['status']}"
            member["status"] = "working"
            member["role"] = role
        else:
            member = {"name": name, "role": role, "status": "working"}
            self.config["members"].append(member)
        self._save_config()
        thread = threading.Thread(
            target=self._teammate_loop,
            args=(name, role, prompt),
            daemon=True,
        )
        self.threads[name] = thread
        thread.start()
        return f"Spawned '{name}' (role: {role})"

config.json 长这样:

{
  "team_name": "default",
  "members": [
    {"name": "alice", "role": "coder", "status": "working"},
    {"name": "bob", "role": "tester", "status": "idle"}
  ]
}

队友的 Agent Loop

队友的循环和原来的 run_subagent 结构一样——调 LLM、执行工具、回传结果。但有三个关键差异:

def _teammate_loop(self, name: str, role: str, prompt: str):
    sys_prompt = (
        f"You are '{name}', role: {role}, at {WORKDIR}. "
        f"Submit plans via plan_approval before major work. "
        f"Respond to shutdown_request with shutdown_response. "
        f"Use send_message to communicate with teammates and lead."
    )
    messages = [{"role": "user", "content": prompt}]
    should_exit = False

    for _ in range(50):
        # ★ 差异1: 每轮检查收件箱
        inbox = BUS.read_inbox(name)
        for msg in inbox:
            messages.append({"role": "user", "content": json.dumps(msg)})
        if should_exit:
            break
        response = client.messages.create(
            model=MODEL,
            system=sys_prompt,
            messages=messages,
            tools=self._teammate_tools(),  # ★ 差异2: 队友专属工具集
            max_tokens=4096,
        )
        # ... 执行工具 ...

        # ★ 差异3: 关机协议
        if block.name == "shutdown_response" and block.input.get("approve"):
            should_exit = True

三个差异一目了然:

对比run_subagent_teammate_loop
收件箱每轮检查 BUS.read_inbox(name)
工具集BASE_TOOLS(4个)8个(含 send_message, read_inbox, shutdown_response, plan_approval)
退出条件stop_reason != tool_use 或达到轮次+ 批准关机后退出

队友的工具集

队友有 8 个工具——基础 4 个 + 通信 2 个 + 协议 2 个:

bash, read_file, write_file, edit_file     ← 干活用
send_message, read_inbox                   ← 通信用
shutdown_response, plan_approval           ← 协议用

注意:队友没有 taskparallel_tasks 等领导专属工具。队友不能派子任务,只能自己干活和通信。层级分明。


第三步:协议——关机握手 + 计划审批

Agent 不是不可控的。两个协议让 Lead 对队友有控制权。

关机握手协议

Lead             Teammate
  |                 |
  |--shutdown_req-->|    Lead: "请优雅关机"
  | {req_id:"abc"}  |
  |<--shutdown_resp-|    Teammate: "好的/再等等"
  | {req_id:"abc",  |
  |  approve:true}  |

Lead 侧:

def handle_shutdown_request(teammate: str) -> str:
    req_id = str(uuid.uuid4())[:8]
    with _tracker_lock:
        shutdown_requests[req_id] = {"target": teammate, "status": "pending"}
    BUS.send("lead", teammate, "Please shut down gracefully.",
             "shutdown_request", {"request_id": req_id})
    return f"Shutdown request {req_id} sent to '{teammate}' (status: pending)"

队友侧(在 _exec 中):

if tool_name == "shutdown_response":
    req_id = args["request_id"]
    approve = args["approve"]
    with _tracker_lock:
        if req_id in shutdown_requests:
            shutdown_requests[req_id]["status"] = (
                "approved" if approve else "rejected"
            )
    BUS.send(sender, "lead", args.get("reason", ""),
             "shutdown_response",
             {"request_id": req_id, "approve": approve})
    return f"Shutdown {'approved' if approve else 'rejected'}"

关键:队友可以拒绝关机。如果队友手头有未完成的工作,它可以回复 approve: false,Lead 会收到拒绝消息,然后决定是等一等还是强制。

计划审批协议

方向相反——队友主动提交计划,Lead 审批。

Teammate           Lead
   |                 |
   |--plan_req------>|    Teammate: "我打算这样做..."
   | {req_id:"xyz"}  |
   |<--plan_resp-----|    Lead: "同意/否决"
   | {req_id:"xyz",  |
   |  approve:true}  |

队友侧:

if tool_name == "plan_approval":
    plan_text = args.get("plan", "")
    req_id = str(uuid.uuid4())[:8]
    with _tracker_lock:
        plan_requests[req_id] = {
            "from": sender, "plan": plan_text, "status": "pending",
        }
    BUS.send(sender, "lead", plan_text, "plan_approval_response",
             {"request_id": req_id, "plan": plan_text})
    return f"Plan submitted (request_id={req_id}). Waiting for lead approval."

Lead 侧:

def handle_plan_review(request_id: str, approve: bool, feedback: str = "") -> str:
    with _tracker_lock:
        req = plan_requests.get(request_id)
    if not req:
        return f"Error: Unknown plan request_id '{request_id}'"
    with _tracker_lock:
        req["status"] = "approved" if approve else "rejected"
    BUS.send("lead", req["from"], feedback, "plan_approval_response",
             {"request_id": request_id, "approve": approve, "feedback": feedback})
    return f"Plan {req['status']} for '{req['from']}'"

共享 FSM

两个协议共享同一个状态机:

[pending] ──approve──> [approved]
[pending] ──reject───> [rejected]

跟踪器是两个字典 + 一把锁:

shutdown_requests: dict[str, dict] = {}   # {req_id: {target, status}}
plan_requests: dict[str, dict] = {}       # {req_id: {from, plan, status}}
_tracker_lock = threading.Lock()

为什么用 request_id 而不是直接用队友名?因为同一个队友可能有多个并发请求——Lead 发了关机请求,队友还没回复,Lead 又发了一个。request_id 让每个请求独立追踪。


第四步:改造 Agent Loop——Lead 检查收件箱

原来的 agent_loop 每轮只做三件事:micro_compact → 调 LLM → 执行工具。现在在调 LLM 之前加一步——检查收件箱

def agent_loop(messages: list):
    while True:
        micro_compact(messages)
        if estimate_tokens(messages) > THRESHOLD:
            messages[:] = auto_compact(messages)

        # ★ 新增: Lead 每轮检查收件箱
        inbox = BUS.read_inbox("lead")
        if inbox:
            messages.append({
                "role": "user",
                "content": f"<inbox>{json.dumps(inbox, indent=2)}</inbox>",
            })

        response = client.messages.create(...)
        # ... 其余不变

只加了 6 行。用 <inbox> 标签包裹,让模型清楚地区分"用户输入"和"队友消息"。

这样 Lead 在每次 LLM 调用前都能看到队友发来的消息——计划提交、关机回复、工作汇报,全部自动注入上下文。


第五步:扩展工具注册

工具定义按功能分组,dispatch map 统一注册:

# 领导 Agent 拥有全部工具
TOOLS = BASE_TOOLS + SUBAGENT_TOOLS + TEAM_TOOLS + PROTOCOL_TOOLS

# 完整 dispatch map
TOOL_HANDLERS = {
    # base (5)
    "bash":             lambda **kw: run_bash(kw["command"]),
    "read_file":        lambda **kw: run_read(kw["path"], kw.get("limit")),
    "write_file":       lambda **kw: run_write(kw["path"], kw["content"]),
    "edit_file":        lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
    "compact":          lambda **kw: "Manual compression requested.",
    # subagent (4)
    "task":             lambda **kw: run_subagent(kw["prompt"]),
    "parallel_tasks":   lambda **kw: run_parallel_tasks(kw["tasks"]),
    "background_task":  lambda **kw: run_background_task(kw["prompt"]),
    "check_task":       lambda **kw: run_check_task(kw["task_id"]),
    # team (5)
    "spawn_teammate":   lambda **kw: TEAM.spawn(kw["name"], kw["role"], kw["prompt"]),
    "list_teammates":   lambda **kw: TEAM.list_all(),
    "send_message":     lambda **kw: BUS.send("lead", kw["to"], kw["content"], ...),
    "read_inbox":       lambda **kw: json.dumps(BUS.read_inbox("lead"), indent=2),
    "broadcast":        lambda **kw: BUS.broadcast("lead", kw["content"], TEAM.member_names()),
    # protocols (3)
    "shutdown_request": lambda **kw: handle_shutdown_request(kw["teammate"]),
    "shutdown_status":  lambda **kw: handle_shutdown_status(kw["request_id"]),
    "plan_review":      lambda **kw: handle_plan_review(kw["request_id"], kw["approve"], ...),
}

还是那个原则——加新能力只加注册,agent_loop 一行不改。dispatch 架构的威力。


第六步:REPL 和优雅退出

新增 REPL 命令

elif query.strip() == "/team":
    print(TEAM.list_all())
    continue
elif query.strip() == "/inbox":
    msgs = BUS.read_inbox("lead")
    if msgs:
        print(json.dumps(msgs, indent=2, ensure_ascii=False))
    else:
        print("  (inbox empty)")
    continue

/team 看队伍状态,/inbox 手动检查收件箱。调试利器。

优雅退出

原来只等后台任务,现在多等一类——活跃的队友线程:

# 优雅退出:等待后台任务 + 关机队友
with bg_lock:
    pending = {tid: f for tid, f in bg_tasks.items() if not f.done()}
active_threads = {n: t for n, t in TEAM.threads.items()
                  if t.is_alive()}

if pending or active_threads:
    print(f"⏳ Waiting for {len(pending)} background task(s) "
          f"+ {len(active_threads)} teammate(s)...")
    for tid, future in pending.items():
        future.result(timeout=300)
    for name, thread in active_threads.items():
        thread.join(timeout=10)

第七步:提示词配套改动

System Prompt

从"coding agent"变成"team lead agent",新增团队和协议工具说明:

SYSTEM = f"""You are a team lead agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain.

You have several task delegation strategies:
- `task`: Run a subtask synchronously. Blocks until done, returns result.
- `parallel_tasks`: Run multiple subtasks concurrently with dependency ordering.
- `background_task`: Fire-and-forget. Returns a task_id immediately.
- `check_task`: Poll a background task by task_id.

Team management:
- `spawn_teammate`: Spawn a persistent named teammate.
- `list_teammates`: See all teammates with name, role, status.
- `send_message`: Send a message to a teammate's inbox.
- `read_inbox`: Read and drain the lead's inbox.
- `broadcast`: Send a message to all teammates.

Protocols:
- `shutdown_request`: Request a teammate to shut down gracefully.
- `shutdown_status`: Check shutdown request status.
- `plan_review`: Approve or reject a teammate's plan."""

执行提示词

EXECUTE_PROMPT 中加一句:

"Use `spawn_teammate` for persistent agents that need ongoing communication. "

让模型在执行阶段知道什么时候用队友而不是一次性子任务。


改造总结

维度agentic-demo.pyharness-agent-teams.py
总行数600904(+304)
领导工具917(+8)
队友工具8
新增类MessageBus + TeammateManager
通信机制JSONL 收件箱
协议关机握手 + 计划审批
REPL 命令/plan, /compact, q+/team, /inbox
退出等待后台任务后台任务 + 队友线程

改动结构:

步骤改动内容新增行数
1团队基础设施(目录 + 消息类型 + 协议跟踪器)~15
2MessageBus 类~45
3TeammateManager 类(含队友 agent loop + 工具集)~170
4协议处理函数(shutdown + plan review)~30
5扩展工具定义 + dispatch map~55
6agent_loop 新增收件箱检查~6
7REPL 命令 + 优雅退出 + 提示词~20

与 Claude Code 的对应

Claude Code 的 Agent 工具在官方文档中也支持类似的团队模式:

  • spawn_teammate ≈ Claude Code 的 Agent tool(启动子 Agent)
  • send_message / read_inbox ≈ Claude Code 的 SendMessage(向运行中的 Agent 发消息,恢复其完整上下文)
  • plan_review ≈ Claude Code 的 Plan Mode(Agent 提出方案,用户确认后执行)

底层原理一样——持久化的 Agent 实例 + 消息传递 + 协议控制。


下一步

Agent 有了团队,但还缺一个关键能力——自主循环

当前的 Agent 还是"一问一答"模式:用户提问,Agent 执行,等下一个问题。真正的自主 Agent 应该能自己规划、自己执行、自己评估,持续循环直到任务完成或遇到需要人工决策的节点。

下一篇,我们让 Agent 从"等指令"进化为"自驱动"。

On this page