HuanCode Docs

Harness实战:Autonomous Agents——从等指令到自驱动

上一篇 Agent 有了团队,但队友还是"被动接活"。这篇让队友变成自治单元:任务看板 + WORK/IDLE 双阶段循环 + 自动认领 + 身份重注入。队友做完手头活,自己去看板找新任务,60秒没活自动关机。从17个工具扩展到19个,从"等指令"进化为"自驱动"。

写在前面

上一篇我们把 Agent 从单兵变成了团队——持久化的命名队友、JSONL 收件箱通信、关机握手和计划审批协议。团队有了,但有一个问题:

队友是被动的。

Lead 派活,队友干活。Lead 不派活,队友就傻等着。如果 Lead 忙着处理别的事情,队友就空转浪费 token。更麻烦的是——Lead 要记住每个队友的进度,手动分配下一个任务。

理想状态应该是:Lead 把任务往看板上一贴,队友自己去认领、自己干、干完了自己去找下一个。Lead 只需要做两件事——发任务审计划

这就是 Autonomous Agents 的核心思路:让队友从"等指令"进化为"自驱动"


团队模式 vs 自治模式:差在哪?

上一篇的团队模式:

Lead: spawn("alice", "coder", "你负责写代码")
Lead: send("alice", "写 auth 模块")          ← Lead 手动派活
Alice: send("lead", "auth 写完了")
Lead: send("alice", "写 user 模块")          ← Lead 又手动派活
Alice: send("lead", "user 写完了")
Lead: shutdown_request("alice")

Lead 要盯着每个队友的进度,手动分配每一个任务。队友数量一多,Lead 就变成了调度瓶颈。

自治模式:

Lead: create_team_task("写 auth 模块")        ← 贴到看板
Lead: create_team_task("写 user 模块")        ← 贴到看板
Lead: create_team_task("写测试", blocked_by=[1,2])
Lead: spawn("alice", "coder", "你是开发者")

Alice: (自动认领 #1) → 干活 → complete_task(1)
Alice: (idle → 扫描看板 → 自动认领 #2) → 干活 → complete_task(2)
Alice: (idle → 扫描看板 → #3 已解锁 → 自动认领 #3) → 干活 → complete_task(3)
Alice: (idle → 60s 无新任务 → 自动关机)

Lead 只负责定义"做什么",不需要关心"谁来做"和"什么时候做下一个"。

维度团队模式自治模式
任务分配Lead 手动 send看板自动认领
空闲处理等 Lead 指令轮询看板找活
退出方式Lead 发 shutdown_request超时自动关机
Lead 负担盯进度 + 派活贴任务 + 审计划

整体架构

基于上一篇的 harness-agent-teams.py(904 行,17 个工具),目标是 harness-autonomous-agents.py

代码执行逻辑流程图

代码执行逻辑流程图

🔍 点击查看大图(可无限缩放)

新增模块

原有 harness-agent-teams.py (保留全部)
├── Base Tools (4): bash, read_file, write_file, edit_file
├── Team Comm (2): send_message, read_inbox
├── Lead Tools: compact, task, parallel_tasks, background_task, check_task,
│              spawn_teammate, list_teammates, broadcast,
│              shutdown_request, shutdown_status, plan_review
└── Mate Tools: shutdown_response, plan_approval

+ 新增
├── Task Board: .tasks/ 目录,任务 JSON 文件,原子化认领
└── 自治循环: WORK → IDLE 双阶段 + 自动认领 + 身份重注入

工具重新整理为 4 组:

分组数量工具用于
BASE_TOOLS4bash, read_file, write_file, edit_file全部角色
TEAM_COMM_TOOLS2send_message, read_inboxLead + 队友
MATE_TOOLS5shutdown_response, plan_approval, idle, claim_task, complete_task队友
LEAD_TOOLS13compact, task, parallel_tasks, background_task, check_task, spawn_teammate, list_teammates, broadcast, shutdown_request, shutdown_status, plan_review, create_team_task, list_team_tasksLead

按角色组合:

TEAMMATE_TOOLS = BASE_TOOLS + TEAM_COMM_TOOLS + MATE_TOOLS    # 11
TOOLS = BASE_TOOLS + TEAM_COMM_TOOLS + LEAD_TOOLS              # 19

下面按改造步骤逐一拆解。


第一步:Task Board——任务看板

任务看板是自治的基础设施。没有看板,队友不知道还有什么活可以干。

文件结构

.tasks/
  task_1.json    ← {"id":1, "subject":"写auth模块", "status":"pending", "owner":null}
  task_2.json    ← {"id":2, "subject":"写user模块", "status":"in_progress", "owner":"alice"}
  task_3.json    ← {"id":3, "subject":"写测试", "status":"pending", "blockedBy":[1,2]}

每个任务一个 JSON 文件。和收件箱的设计思路一样——文件即状态,可观测,可调试。随时 cat .tasks/task_1.json 看任务状态。

核心操作

四个函数,覆盖任务全生命周期:

def create_team_task(subject: str, description: str = "",
                blocked_by: list = None) -> str:
    """创建任务到看板。"""
    TASKS_DIR.mkdir(exist_ok=True)
    existing = list(TASKS_DIR.glob("task_*.json"))
    next_id = max((int(f.stem.split("_")[1]) for f in existing), default=0) + 1
    task = {
        "id": next_id,
        "subject": subject,
        "description": description,
        "status": "pending",
        "owner": None,
        "blockedBy": blocked_by or [],
    }
    path = TASKS_DIR / f"task_{next_id}.json"
    path.write_text(json.dumps(task, indent=2))
    return f"Created task #{next_id}: {subject}"

ID 自增逻辑:扫描已有文件取最大 ID + 1。简单粗暴,适合单机场景。

原子化认领

关键问题:多个队友可能同时看到同一个未认领任务并尝试认领。需要加锁:

_claim_lock = threading.Lock()

def claim_task(task_id: int, owner: str) -> str:
    """原子化认领任务,加锁防止竞争。"""
    with _claim_lock:
        path = TASKS_DIR / f"task_{task_id}.json"
        if not path.exists():
            return f"Error: Task {task_id} not found"
        task = json.loads(path.read_text())
        if task.get("owner"):
            return f"Error: Task {task_id} already claimed by {task['owner']}"
        if task.get("status") != "pending":
            return f"Error: Task {task_id} status is '{task['status']}', not pending"
        if task.get("blockedBy"):
            return f"Error: Task {task_id} is blocked by {task['blockedBy']}"
        task["owner"] = owner
        task["status"] = "in_progress"
        path.write_text(json.dumps(task, indent=2))
    return f"Claimed task #{task_id} for {owner}"

_claim_lock 确保同一时刻只有一个线程能认领。认领失败不是异常——队友 A 认领失败说明队友 B 已经抢到了,继续轮询就行。

完成 + 解除阻塞

任务完成时,除了标记自己为 completed,还要解除对其他任务的阻塞

def complete_task(task_id: int, owner: str) -> str:
    """标记任务完成,并解除对其他任务的阻塞。"""
    with _claim_lock:
        path = TASKS_DIR / f"task_{task_id}.json"
        task = json.loads(path.read_text())
        if task.get("owner") != owner:
            return f"Error: Task {task_id} is not owned by {owner}"
        task["status"] = "completed"
        path.write_text(json.dumps(task, indent=2))
        # 解除依赖:移除其他任务 blockedBy 中的本任务
        for f in TASKS_DIR.glob("task_*.json"):
            if f == path:
                continue
            other = json.loads(f.read_text())
            blocked = other.get("blockedBy", [])
            if task_id in blocked:
                blocked.remove(task_id)
                other["blockedBy"] = blocked
                f.write_text(json.dumps(other, indent=2))
    return f"Task #{task_id} completed by {owner}"

比如 task #3 的 blockedBy: [1, 2],当 task #1 完成时变成 blockedBy: [2],task #2 也完成后变成 blockedBy: [],此时 task #3 就可以被认领了。

任务状态机:

[pending] ──claim──> [in_progress] ──complete──> [completed]

                                          └──> 解除其他任务的 blockedBy

第二步:自治循环——WORK/IDLE 双阶段

这是最核心的改造。上一篇的 _teammate_loop 是单阶段的:

spawn → WORK (max 50 turns) → 结束

现在变成双阶段循环:

spawn → WORK → IDLE → WORK → IDLE → ... → SHUTDOWN

完整的循环结构

def _teammate_loop(self, name: str, role: str, prompt: str):
    """队友的自治 agent loop: WORK → IDLE → WORK → ... → SHUTDOWN"""
    messages = [{"role": "user", "content": prompt}]
    should_shutdown = False

    while True:
        # ── WORK PHASE ────────────────────────
        for _ in range(50):
            inbox = BUS.read_inbox(name)
            for msg in inbox:
                messages.append({"role": "user", "content": json.dumps(msg)})

            response = client.messages.create(
                model=MODEL, system=sys_prompt,
                messages=messages, tools=TEAMMATE_TOOLS,
            )
            messages.append({"role": "assistant", "content": response.content})

            if response.stop_reason != "tool_use":
                break

            results = []
            idle_requested = False
            for block in response.content:
                if block.type == "tool_use":
                    if block.name == "idle":
                        idle_requested = True
                        output = "Entering idle phase. Will poll for new tasks."
                    else:
                        output = self._exec(name, block.name, block.input)
                    results.append({"type": "tool_result", ...})
                    # 后置检查: 队友批准关机
                    if block.name == "shutdown_response" and block.input.get("approve"):
                        should_shutdown = True
            messages.append({"role": "user", "content": results})

            if idle_requested or should_shutdown:
                break

        if should_shutdown:
            self._set_status(name, "shutdown")
            return

        # ── IDLE PHASE ────────────────────────
        self._set_status(name, "idle")
        resume = self._idle_poll(name, role, messages)
        if not resume:
            self._set_status(name, "shutdown")
            return
        self._set_status(name, "working")

工作阶段的工具分发只有三种情况:

  1. idle — 不走 _exec,直接 break 进入 IDLE 阶段
  2. else — 全部进入 _exec(),内部按 handler dict 分发
  3. 后置检查shutdown_response(approve=true)should_shutdown,break 后直接 return

IDLE 阶段:轮询 + 认领

def _idle_poll(self, name: str, role: str, messages: list) -> bool:
    """空闲阶段: 轮询收件箱和任务看板。返回 True 恢复工作,False 关机。"""
    polls = IDLE_TIMEOUT // max(POLL_INTERVAL, 1)    # 60 / 5 = 12 次

    for _ in range(polls):
        time.sleep(POLL_INTERVAL)                     # 每 5 秒一次

        # 1. 检查收件箱
        inbox = BUS.read_inbox(name)
        if inbox:
            for msg in inbox:
                if msg.get("type") == "shutdown_request":
                    # 空闲时收到关机请求 → 自动批准
                    BUS.send(name, "lead", "Auto-approved (was idle)",
                             "shutdown_response", ...)
                    return False
                messages.append({"role": "user", "content": json.dumps(msg)})
            return True                               # 有消息 → 恢复工作

        # 2. 扫描任务看板
        unclaimed = scan_unclaimed_tasks()
        if unclaimed:
            task = unclaimed[0]
            result = claim_task(task["id"], name)
            if result.startswith("Error:"):
                continue                              # 被抢了,继续轮询
            # 身份重注入 (如果上下文被压缩过)
            if len(messages) <= 3:
                messages.insert(0, make_identity_block(name, role, team_name))
            messages.append({"role": "user", "content": f"<auto-claimed>Task #{task['id']}: ..."})
            return True                               # 认领成功 → 恢复工作

    return False                                      # 超时 → 关机

每次轮询做两件事,优先级从高到低:

  1. 检查收件箱 — 有消息就恢复工作(或者自动批准关机)
  2. 扫描看板 — 有未认领任务就认领并恢复工作

12 次都没找到活 → 超时关机。

关机的两种模式

工作中关机 — Lead 发 shutdown_request,队友收到后调用 shutdown_response 工具回复。队友可以拒绝("我还在写代码")。这走的是上一篇的协议路径,LLM 自己决定批准还是拒绝。

空闲时关机 — 两种触发:

  1. 收到 shutdown_request → 代码自动批准,不经过 LLM(反正没在干活)
  2. 60 秒超时 → 直接关机

这个设计很关键:工作中需要 LLM 判断能否中断,空闲时不需要——既尊重正在执行的任务,又不浪费空闲时的 token。


第三步:身份重注入

队友运行时间长了,上下文会膨胀,触发 auto_compact 压缩。压缩后 messages 可能只剩 1-2 条摘要消息,队友的身份信息丢失了——它不知道自己是谁、属于哪个团队、扮演什么角色。

解决方案:在恢复工作前检查上下文长度,过短说明发生了压缩,重新注入身份信息:

def make_identity_block(name: str, role: str, team_name: str) -> dict:
    return {
        "role": "user",
        "content": (
            f"<identity>You are '{name}', role: {role}, "
            f"team: {team_name}. Continue your work.</identity>"
        ),
    }

注入时机:

# _idle_poll 中认领新任务时
if len(messages) <= 3:
    messages.insert(0, make_identity_block(name, role, team_name))
    messages.insert(1, {
        "role": "assistant",
        "content": f"I am {name}. Continuing.",
    })

为什么阈值是 3?正常运行的 messages 至少有几十条。压缩后通常只剩 1 条摘要。3 是一个保守的判断——如果 messages 这么短,几乎可以确定发生了压缩。

insert(0, ...) 放在最前面,确保模型在处理任何后续消息之前先看到自己的身份。配合一条 assistant 消息确认身份,形成标准的 user/assistant 对话对。


第四步:工具整理

上一篇工具按功能细分了 8 组,这次合并为 4 组,更清晰:

# ── 所有角色共用 (4) ─────────────────────────────────
BASE_TOOLS = [
    bash, read_file, write_file, edit_file
]

# ── 团队通信: Lead + 队友共用 (2) ────────────────────
TEAM_COMM_TOOLS = [
    send_message, read_inbox
]

# ── 队友专属 (5) ─────────────────────────────────────
MATE_TOOLS = [
    # 协议
    shutdown_response, plan_approval,
    # 自治
    idle, claim_task, complete_task,
]

# ── Lead 专属 (13) ───────────────────────────────────
LEAD_TOOLS = [
    # 上下文压缩
    compact,
    # 子任务委派
    task, parallel_tasks, background_task, check_task,
    # 团队管理
    spawn_teammate, list_teammates, broadcast,
    # 协议控制
    shutdown_request, shutdown_status, plan_review,
    # 任务看板
    create_team_task, list_team_tasks,
]

注意命名:任务看板的工具叫 create_team_tasklist_team_tasks,带 team 前缀,和子 Agent 委派的 task 工具区分开。一个是"贴到看板等人来认领",一个是"立刻同步执行子任务"。


第五步:执行逻辑全景图

五处模型调用,对应不同的角色和工具集:

调用位置角色tools数量
agent_loopLeadTOOLS19
run_subagent子 AgentBASE_TOOLS4
_teammate_loop队友TEAMMATE_TOOLS11
auto_compact摘要生成0
plan_then_executeLead(规划)0

完整执行流:

REPL (main thread)

  ├─ agent_loop (Lead, tools=19)
  │   ├─ task/parallel_tasks/background_task
  │   │   └→ run_subagent (子 Agent, tools=4)
  │   │       └→ bash/read_file/write_file/edit_file
  │   │
  │   ├─ spawn_teammate
  │   │   └→ _teammate_loop (队友, daemon thread, tools=11)
  │   │       ├─ WORK: LLM → idle / _exec() / shutdown check
  │   │       │   └→ _exec: bash/read_file/.../claim_task/complete_task/...
  │   │       └─ IDLE: poll inbox → poll task board → timeout
  │   │
  │   ├─ create_team_task → 贴到 .tasks/
  │   ├─ send_message → 写入 .team/inbox/
  │   └─ compact → auto_compact (无 tools)

  └─ plan_then_execute (无 tools → agent_loop)

层级分明:Lead 做决策和调度,子 Agent 做一次性任务,队友做持续性工作并自治循环。


第六步:REPL 新增 /tasks 命令

elif query.strip() == "/tasks":
    print(list_team_tasks())
    continue

输出样例:

Task Board:
  [x] #1: 写 auth 模块 @alice
  [>] #2: 写 user 模块 @alice
  [ ] #3: 写测试 (blocked by: [2])

[ ] 待认领、[>] 进行中、[x] 已完成。一目了然。

配合已有的 /team/inbox,三个调试命令覆盖了自治系统的全部状态。


改造总结

维度harness-agent-teams.pyharness-autonomous-agents.py
工具分组8 组4 组
Lead 工具1719(+create_team_task, list_team_tasks)
队友工具811(+idle, claim_task, complete_task)
队友循环单阶段 WORK双阶段 WORK → IDLE
任务分配Lead 手动 send看板自动认领
退出机制只有协议关机协议关机 + 超时自动关机
新增模块TaskBoard + 身份重注入
REPL 命令/team, /inbox+ /tasks

新增改动分布:

步骤内容说明
1TaskBoard(5 个函数)create_team_task, scan_unclaimed, claim_task, complete_task, list_team_tasks
2_teammate_loop 重写从单阶段变双阶段 WORK/IDLE
3_idle_poll轮询收件箱 + 扫描看板 + 超时关机
4身份重注入make_identity_block + 压缩检测
5工具重整8 组 → 4 组,新增 3 个队友工具 + 2 个 Lead 工具

与 Claude Code 的对应

Claude Code 的 Agent 实际上也有类似的自治行为:

  • Task Board ≈ Claude Code 的 TodoWrite(任务跟踪,Agent 自己决定下一步做什么)
  • WORK/IDLE 循环 ≈ Claude Code 的 Agent Loop(Agent 持续运行直到任务完成或需要用户输入)
  • 自动认领 ≈ Claude Code 的 background Agent(后台 Agent 完成一个任务后可以被 SendMessage 唤醒做下一个)
  • 身份重注入 ≈ Claude Code 的 context compaction 后自动注入 system prompt 和 CLAUDE.md

底层思路一致:给 Agent 足够的自主权,但保留人类的控制点(协议审批、超时关机、任务看板的可观测性)。


下一步

到这里,我们的 Harness 已经具备了一个完整 Agent 系统的核心能力:

篇目能力
Agent Loop基础循环
Tool Use工具注册与分发
Plan Mode先规划再执行
Subagent子任务委派
Parallel Tasks拓扑排序并行
Background Tasks异步后台
Context Compact三层压缩
Agent Teams团队通信与协议
Autonomous Agents自治循环与任务看板

下一篇,我们来看 Worktree Isolation——让 Agent 在隔离的 Git 工作树中执行,避免污染主分支。

On this page