HuanCode Docs

Harness实战:Worktree Isolation——让每个任务各干各的

上一篇Agent能自治循环了,但所有Agent共享一个工作目录——两个Agent同时改同一个文件就互相污染。这篇用git worktree给每个任务创建独立工作目录,加上TaskBoard任务看板和事件流,实现双平面架构(控制面+执行面)、双状态机联动、崩溃恢复。从17个工具扩展到25个,从"共享空间"进化为"各干各的"。

写在前面

上一篇我们让 Agent 从"等指令"进化为"自驱动"——任务看板 + WORK/IDLE 双阶段循环 + 自动认领。队友做完手头活,自己去看板找新任务,60 秒没活自动关机。

但有一个问题,从引入多 Agent 团队起就一直悬着——所有 Agent 共享同一个工作目录。

单 Agent 串行时没问题。但多个队友并行干活时,Agent A 在重构 config.py,Agent B 同时在给 config.py 加校验逻辑。两个 Agent 同时改同一个文件,A 的改动覆盖 B 的改动,B 的校验逻辑凭空消失。没有报错,没有冲突提示,静默数据丢失。

这就是控制面与执行面没有分离的后果。

这一篇,我们用 git worktree 给每个任务创建独立的工作目录,彻底解决并行执行的文件冲突。


共享目录 vs Worktree 隔离:差在哪?

之前的模式:

Agent A: 改 config.py (删30行,重构)
Agent B: 改 config.py (在第25行加15行校验)
→ 最终结果: B的版本覆盖A,A的重构完全丢失

Worktree 隔离模式:

Lead: task_create("重构 config.py")
Lead: task_create("加校验逻辑")
Lead: worktree_create(task_id=1)     ← 创建 .worktrees/task-1/
Lead: worktree_create(task_id=2)     ← 创建 .worktrees/task-2/

Agent A: worktree_run(1, "vim config.py")   ← 在 task-1 目录操作
Agent B: worktree_run(2, "vim config.py")   ← 在 task-2 目录操作
→ 文件系统级隔离,不可能互相污染
维度共享目录Worktree 隔离
文件操作所有 Agent 同目录每个任务独立目录
冲突风险静默覆盖物理级隔离
状态管理仅 Task 状态机Task + Worktree 双状态机
崩溃恢复靠 tasks.json事件流 + 注册表 + 磁盘交叉比对

整体架构

基于 harness-agent-teams.py(904 行,17 个工具),目标是 harness-worktree-isolation.py(1320 行,25 个工具)。

代码执行逻辑流程图

代码执行逻辑流程图

流程图分为五个区域:

  • REPL 主循环(左上):7 个状态查询命令 + /plan + /compact + 普通指令入口
  • agent_loop(中部):三层压缩 → 收件箱注入 → LLM 调用 → 25 个工具分发 → 循环
  • 子流程(右侧):run_subagent(同步 4 工具)、parallel_tasks(拓扑并行)、background_task(fire-and-forget)
  • 队友循环(右下):独立 daemon 线程,8 个工具,收件箱驱动,支持优雅关机
  • Worktree 生命周期(底部蓝色区域):create(双状态机联动)→ run(cwd 隔离执行)→ remove(收尾链),紫色节点标记 before/after 事件对

新增两个核心模块:

原有 harness-agent-teams.py (保留全部)
├── BASE_TOOLS (4): bash, read_file, write_file, edit_file
├── TEAM_COMM_TOOLS (2): send_message, read_inbox
├── MATE_TOOLS (2): shutdown_response, plan_approval
├── LEAD_TOOLS (13): compact, task, parallel_tasks, background_task, check_task,
│                    spawn_teammate, list_teammates, broadcast,
│                    shutdown_request, shutdown_status, plan_review
└── Plan Mode + Context Compact

+ 新增 (塞入 LEAD_TOOLS)
├── 任务看板 (2): task_create, task_list
└── Worktree 隔离 (6): worktree_create, worktree_remove, worktree_run,
                        worktree_list, worktree_keep, worktree_events

工具分组沿用 harness-autonomous-agents.py 的 4 组风格:

分组数量工具名
BASE_TOOLS4bash, read_file, write_file, edit_file
TEAM_COMM_TOOLS2send_message, read_inbox
MATE_TOOLS2shutdown_response, plan_approval
LEAD_TOOLS19compact, task, parallel_tasks, background_task, check_task, spawn_teammate, list_teammates, broadcast, shutdown_request, shutdown_status, plan_review, task_create, task_list, worktree_create, worktree_remove, worktree_run, worktree_list, worktree_keep, worktree_events

按角色组合:

TEAMMATE_TOOLS = BASE_TOOLS + TEAM_COMM_TOOLS + MATE_TOOLS  # 8
TOOLS = BASE_TOOLS + TEAM_COMM_TOOLS + LEAD_TOOLS            # 25

下面按改造步骤逐一拆解。


第一步:双平面架构

这是整个改造的设计基础。

项目根目录/
├── .tasks/                    ← 控制面:调度和状态
│   ├── tasks.json             ← 任务列表 + 状态 + worktree 绑定
│   └── events.jsonl           ← 生命周期事件流

├── .worktrees/                ← 执行面:独立工作目录
│   ├── index.json             ← worktree 注册表
│   ├── task-1/                ← 任务1的完整项目副本
│   └── task-2/                ← 任务2的完整项目副本

├── config.py                  ← 主目录不受影响
└── ...

控制面.tasks/):任务列表、依赖关系、事件日志。纯数据,不涉及文件操作。

执行面.worktrees/):独立的工作目录。每个 worktree 是一个完整的项目副本,Agent 在里面自由操作。

注册表index.json):记录每个 worktree 的元信息——关联的 task_id、分支名、路径、当前状态。

两个平面通过 task_id 关联,但物理上完全隔离。这个模式在分布式系统中无处不在——Kubernetes 的 etcd 管状态、Node 跑容器;数据库的 WAL 管一致性、数据页存数据。


第二步:TaskBoard——任务看板

之前的 harness-autonomous-agents.py 用每个任务一个 JSON 文件的方式管理看板。这次简化为一个 tasks.json 文件集中管理:

class TaskBoard:
    """任务看板: .tasks/tasks.json 管理任务全生命周期。

    Task FSM:
      [pending] → [in_progress] → [completed]

                        └→ [failed]
    """

    def __init__(self, tasks_dir: Path):
        self.dir = tasks_dir
        self.dir.mkdir(exist_ok=True)
        self.tasks_path = self.dir / "tasks.json"
        self.tasks: list[dict] = self._load()

    def create(self, subject: str, description: str = "",
               depends_on: list = None) -> str:
        next_id = max((t["id"] for t in self.tasks), default=0) + 1
        task = {
            "id": next_id,
            "subject": subject,
            "description": description,
            "status": "pending",
            "owner": None,
            "depends_on": depends_on or [],
            "worktree": None,            # ← 新增: worktree 绑定字段
        }
        self.tasks.append(task)
        self._save()
        return json.dumps(task, indent=2, ensure_ascii=False)

和之前自治模式的 TaskBoard 相比,关键区别是多了一个 worktree 字段——记录任务绑定的 worktree 名称。这是控制面和执行面的连接点。

还有两个方法配合 WorktreeManager 使用:

def update_status(self, task_id: int | str, status: str) -> str:
    """WorktreeManager 调用:推进任务状态。"""
    task = self.find(int(task_id))
    if not task:
        return f"Error: Task {task_id} not found"
    task["status"] = status
    self._save()
    return f"Task #{task_id} status → {status}"

def bind_worktree(self, task_id: int, worktree_name: str):
    """WorktreeManager 调用:绑定 worktree 名称。"""
    task = self.find(task_id)
    if task:
        task["worktree"] = worktree_name
        self._save()

列表展示加了 worktree 信息:

def list_all(self) -> str:
    status_icon = {
        "pending": " ", "in_progress": ">",
        "completed": "x", "failed": "!",
    }
    lines = ["Task Board:"]
    for t in self.tasks:
        icon = status_icon.get(t["status"], "?")
        owner = f" @{t['owner']}" if t.get("owner") else ""
        wt = f" wt={t['worktree']}" if t.get("worktree") else ""
        deps = f" (depends on: {t['depends_on']})" if t.get("depends_on") else ""
        lines.append(f"  [{icon}] #{t['id']}: {t['subject']}{owner}{wt}{deps}")
    return "\n".join(lines)

输出样例:

Task Board:
  [>] #1: 重构 config.py wt=task-1
  [>] #2: 加校验逻辑 wt=task-2
  [ ] #3: 写集成测试 (depends on: [1, 2])

第三步:WorktreeManager——核心引擎

这是整个改造最重要的类,约 210 行。管理 git worktree 的完整生命周期,与 TaskBoard 联动。

双状态机

Task FSM:                          Worktree FSM:
  [pending] → [in_progress] → [completed]    [absent] → [active] → [removed]
                    │                                       │
                    └→ [failed]                             └→ [kept]

两个状态机通过绑定关系联动:

  • 创建: worktree_create → Task pending→in_progress + Worktree absent→active
  • 收尾: worktree_remove → Task in_progress→completed + Worktree active→removed

一个调用,两个状态机同时推进。

注册表

def _register(self, task_id: str, branch: str, path: str) -> None:
    index = self._load_index()
    key = f"task-{task_id}"
    index[key] = {
        "task_id": task_id,
        "branch": branch,
        "path": path,
        "status": "active",
        "created_at": datetime.now(timezone.utc).isoformat(),
    }
    self._save_index(index)

注册表是 worktree 的"户口簿"。创建时注册,移除时注销。崩溃恢复时,对比注册表和磁盘实际状态,就能重建现场。

事件流

def _emit(self, event_type: str, **data) -> None:
    event = {
        "ts": datetime.now(timezone.utc).isoformat(),
        "type": event_type,
        **data,
    }
    with open(self._events_path(), "a") as f:
        f.write(json.dumps(event, ensure_ascii=False) + "\n")

每个操作的 before/after 都写一条事件。JSONL 格式,一行一事件,追加写入,永不覆盖。

创建 Worktree

def worktree_create(self, task_id: str, branch: str | None = None) -> str:
    """创建 worktree 并绑定到任务,自动推进 Task pending→in_progress。"""
    task = self.task_board.find(int(task_id))
    if not task:
        return f"Error: Task #{task_id} not found"
    if task["status"] != "pending":
        return f"Error: Task #{task_id} status is '{task['status']}', expected 'pending'"

    branch = branch or f"task-{task_id}"
    wt_path = self.worktrees_dir / f"task-{task_id}"

    self._emit("worktree.create.before", task_id=task_id, branch=branch)

    try:
        # 1. 创建 git worktree
        subprocess.run(
            ["git", "worktree", "add", str(wt_path), "-b", branch],
            cwd=self.project_root, check=True,
            capture_output=True, text=True,
        )
        # 2. 注册到 index.json
        self._register(task_id, branch, str(wt_path))
        # 3. 推进任务状态: pending → in_progress
        self.task_board.update_status(task_id, "in_progress")
        # 4. 绑定 worktree 名称到任务
        self.task_board.bind_worktree(int(task_id), f"task-{task_id}")
        self._emit("worktree.create.after", task_id=task_id, path=str(wt_path))

        return json.dumps({
            "task_id": task_id,
            "branch": branch,
            "path": str(wt_path),
            "status": "active",
        }, indent=2)

    except subprocess.CalledProcessError as e:
        self._emit("worktree.create.failed", task_id=task_id, error=e.stderr.strip())
        return f"Error: {e.stderr.strip()}"

注意两个前置检查:任务必须存在、状态必须是 pending。防止重复创建或对已完成的任务创建 worktree。

一个方法做四件事:创建 git worktree、注册到 index.json、推进任务状态、绑定名称。调用方只需要一行代码。

在 Worktree 中执行

def run_in_worktree(self, task_id: str, command: str) -> str:
    """在任务的 worktree 目录中执行命令。"""
    wt_path = self._get_worktree_path(task_id)
    if not wt_path or not wt_path.exists():
        return f"Error: No active worktree for task #{task_id}"

    try:
        r = subprocess.run(
            command, shell=True, cwd=wt_path,      # ← cwd 切到 worktree 路径
            capture_output=True, text=True, timeout=120,
        )
        out = (r.stdout + r.stderr).strip()
        return out[:50000] if out else "(no output)"
    except subprocess.TimeoutExpired:
        return "Error: Timeout (120s)"

关键就一行:cwd=wt_path。把命令的工作目录切到 worktree 路径。Agent A 在 .worktrees/task-1/ 里改文件,Agent B 在 .worktrees/task-2/ 里改文件,文件系统级隔离

收尾:移除 Worktree

def worktree_remove(self, task_id: str, complete_task: bool = True) -> str:
    """拆除 worktree。complete_task=True 同时标记任务完成。"""
    wt_path = self._get_worktree_path(task_id)
    if not wt_path:
        return f"Error: No worktree for task #{task_id}"

    self._emit("worktree.remove.before", task_id=task_id, complete_task=complete_task)

    # 1. 完成任务(可选)
    if complete_task:
        self.task_board.update_status(task_id, "completed")
        self._emit("task.completed", task_id=task_id)

    # 2. 移除 git worktree
    if wt_path.exists():
        subprocess.run(
            ["git", "worktree", "remove", str(wt_path), "--force"],
            cwd=self.project_root, capture_output=True, text=True,
        )

    # 3. 清理分支
    subprocess.run(
        ["git", "branch", "-d", f"task-{task_id}"],
        cwd=self.project_root, capture_output=True, text=True,
    )

    # 4. 从注册表注销
    self._unregister(task_id)
    self._emit("worktree.remove.after", task_id=task_id)
    return f"Removed worktree for task #{task_id}"

一个 worktree_remove 触发完整的收尾链:完成任务 → 拆除目录 → 清理分支 → 注销注册 → 记录事件。

Keep:保留不拆

def worktree_keep(self, task_id: str) -> str:
    """标记 worktree 为 kept 状态,保留不拆,供调试。"""
    index = self._load_index()
    key = f"task-{task_id}"
    if key not in index:
        return f"Error: No worktree for task #{task_id}"
    index[key]["status"] = "kept"
    self._save_index(index)
    self._emit("worktree.keep", task_id=task_id)
    return json.dumps({"task_id": task_id, "status": "kept"})

有时候任务出了问题,需要保留 worktree 供调试。worktree_keep 把状态从 active 改为 kept,这个 worktree 不会被自动清理。


第四步:崩溃恢复

这是事件流的核心价值。如果进程在创建 worktree 的过程中崩溃了——worktree.create.before 已写入但 worktree.create.after 还没写——怎么办?

def _find_incomplete_ops(self) -> list:
    """扫描事件流,找未闭合的 before 事件。"""
    before_events = {}
    for line in events_path.read_text().strip().split("\n"):
        event = json.loads(line)
        etype = event["type"]
        tid = event.get("task_id", "")

        if etype.endswith(".before"):
            before_events[f"{etype}:{tid}"] = event
        elif etype.endswith(".after") or etype.endswith(".failed"):
            base = etype.rsplit(".", 1)[0]
            before_events.pop(f"{base}.before:{tid}", None)

    return list(before_events.values())

逻辑很简洁:遍历所有事件,before 入栈,afterfailed 出栈。最后栈里剩下的就是"开始了但没完成"的操作。

完整恢复流程:

def recover(self) -> str:
    """从 events + index.json + 磁盘交叉比对,恢复一致状态。"""
    issues = {"orphaned_worktrees": [], "incomplete_ops": [], "recovered": []}

    # 1. 扫描事件流,找未闭合的 before 事件
    pending_ops = self._find_incomplete_ops()
    for op in pending_ops:
        if op["type"] == "worktree.create.before":
            # 创建到一半崩溃了 → 清理残留目录
            partial_path = self.worktrees_dir / f"task-{op['task_id']}"
            if partial_path.exists():
                subprocess.run(
                    ["git", "worktree", "remove", str(partial_path), "--force"], ...
                )
            self._unregister(op["task_id"])

    # 2. 对比 index.json 和磁盘实际状态
    for wt_id, wt_info in self._load_index().items():
        if not Path(wt_info["path"]).exists():
            # 注册了但目录不存在 → 孤儿记录
            self._unregister(wt_info["task_id"])
            issues["orphaned_worktrees"].append(wt_id)

    # 3. 对比磁盘和 index.json
    for dir_path in self.worktrees_dir.iterdir():
        if dir_path.is_dir() and dir_path.name not in index:
            # 目录存在但没注册 → 孤儿目录
            issues["orphaned_worktrees"].append(dir_path.name)

    return json.dumps(issues, indent=2, ensure_ascii=False)

三个数据源交叉比对:

  • 事件流before 但没 after → 半完成操作,回滚
  • index.json 有但磁盘没有 → 注册表残留,清理
  • 磁盘有但 index.json 没有 → 孤儿目录,标记

这就是为什么需要事件流——没有 before/after 事件对,你无法区分"正常运行中"和"崩溃到一半"。


第五步:工具注册与 dispatch map

新增 8 个工具,直接塞进 LEAD_TOOLS 里——沿用 harness-autonomous-agents.py 的 4 组风格,只加注册,agent_loop 一行不改

工具定义

8 个新工具全部加到 LEAD_TOOLS 末尾:

LEAD_TOOLS = [
    # ... 原有 13 个工具 ...

    # 任务看板 (+2)
    {"name": "task_create", "description": "Create a task on the board.",
     "input_schema": {"type": "object", "properties": {
         "subject": {"type": "string"},
         "description": {"type": "string"},
         "depends_on": {"type": "array", "items": {"type": "integer"}}
     }, "required": ["subject"]}},
    {"name": "task_list", "description": "List all tasks with status.",
     "input_schema": {"type": "object", "properties": {}}},

    # worktree 隔离 (+6)
    {"name": "worktree_create", ...},   # 创建并绑定
    {"name": "worktree_remove", ...},   # 拆除并完成
    {"name": "worktree_run", ...},      # 在隔离目录执行
    {"name": "worktree_list", ...},     # 列出所有 worktree
    {"name": "worktree_keep", ...},     # 标记保留
    {"name": "worktree_events", ...},   # 查看事件日志
]

Dispatch Map

TOOL_HANDLERS = {
    **WORKSPACE_HANDLERS,
    # ... 原有 handlers ...

    # task board
    "task_create":      lambda **kw: BOARD.create(kw["subject"],
                            kw.get("description", ""), kw.get("depends_on")),
    "task_list":        lambda **kw: BOARD.list_all(),

    # worktree isolation
    "worktree_create":  lambda **kw: WT.worktree_create(
                            kw["task_id"], kw.get("branch")),
    "worktree_remove":  lambda **kw: WT.worktree_remove(
                            kw["task_id"], kw.get("complete_task", True)),
    "worktree_run":     lambda **kw: WT.run_in_worktree(
                            kw["task_id"], kw["command"]),
    "worktree_list":    lambda **kw: WT.worktree_list(),
    "worktree_keep":    lambda **kw: WT.worktree_keep(kw["task_id"]),
    "worktree_events":  lambda **kw: WT.worktree_events(kw.get("limit", 30)),
}

命名注意:任务看板用 task_create / task_list(不是 create_task),和子 Agent 的 task 工具区分开。一个是"贴到看板",一个是"立刻同步执行子任务"。都在同一个 LEAD_TOOLS 数组里,只靠注释分区。


第六步:REPL 新增命令

elif query.strip() == "/tasks":
    print(BOARD.list_all())
    continue
elif query.strip() == "/worktrees":
    print(WT.worktree_list())
    continue
elif query.strip() == "/events":
    print(WT.worktree_events())
    continue
elif query.strip() == "/recover":
    print(WT.recover())
    continue

四个新命令,覆盖 worktree 隔离系统的全部状态:

  • /tasks — 看任务看板,含 worktree 绑定信息
  • /worktrees — 看所有 worktree 状态(active/kept/removed)
  • /events — 看生命周期事件流
  • /recover — 手动触发崩溃恢复

配合已有的 /team/inbox/compact,七个调试命令覆盖了系统的全部可观测面。


第七步:提示词配套改动

System Prompt

新增 worktree 和 task board 工具说明:

SYSTEM = f"""You are a team lead agent at {WORKDIR}. Use tools to solve tasks.

...(原有内容不变)

Task board:
- `task_create`: Create a task on the board with optional dependencies.
- `task_list`: List all tasks with status.

Worktree isolation:
- `worktree_create`: Create an isolated git worktree bound to a task.
- `worktree_remove`: Remove a worktree, optionally completing the bound task.
- `worktree_run`: Run a command inside a worktree's directory.
- `worktree_list`: List all worktrees with status.
- `worktree_keep`: Mark a worktree as kept (for debugging).
- `worktree_events`: Show worktree lifecycle event log."""

执行提示词

EXECUTE_PROMPT 中加一句:

"Use `worktree_create` to isolate file changes per task. "

让模型在执行阶段知道什么时候该用 worktree 隔离而不是直接在主目录操作。


改造总结

维度harness-agent-teams.pyharness-worktree-isolation.py
总行数9041320(+416)
Lead 工具1725(+8)
队友工具88(不变)
新增类TaskBoard + WorktreeManager
文件隔离git worktree 级隔离
状态管理仅 Task 状态机Task + Worktree 双状态机
元信息管理config.json+ tasks.json + index.json
生命周期日志events.jsonl 显式事件流
崩溃恢复三源交叉比对
REPL 命令/team, /inbox+ /tasks, /worktrees, /events, /recover

新增改动分布:

步骤内容说明
1双平面架构.tasks/ (控制面) + .worktrees/ (执行面)
2TaskBoard 类(~70 行)tasks.json 集中管理 + worktree 绑定字段
3WorktreeManager 类(~210 行)注册表 + 事件流 + 生命周期管理
4崩溃恢复_find_incomplete_ops + recover
5工具注册(~40 行)8 个新工具定义 + dispatch map
6REPL + 提示词(~30 行)4 个新命令 + prompt 更新

与 Claude Code 的对应

Claude Code 在 Agent tool 中原生支持 worktree 隔离模式:

  • worktree_create ≈ Claude Code 的 Agent(isolation="worktree")——给子 Agent 创建临时 git worktree,在隔离副本中工作
  • 双平面架构 ≈ Claude Code 的 worktree 管理机制——Agent 在独立目录工作,完成后自动清理(无更改时)或保留(有更改时返回路径和分支)
  • events.jsonl ≈ Claude Code 的 Agent 返回结果中的 worktree 信息(worktree path and branch are returned in the result
  • 崩溃恢复 ≈ Claude Code 的自动清理(The worktree is automatically cleaned up if the agent makes no changes

底层思路一致:每个并行任务一个独立的文件系统空间,互不干扰,用完清理。


下一步

到这里,Harness 实战系列已经走过了完整的 Agent 系统核心能力:

篇目能力新增工具
Agent Loop基础循环4
Tool Use + Plan Mode工具注册 + 规划+1
Subagent子任务委派+1
LLM API模型能力
Parallel Tasks拓扑排序并行+1
Background Tasks异步后台+2
Context Compact三层压缩
Agent Teams团队通信与协议+8
Autonomous Agents自治循环与任务看板+5
Worktree Isolation双平面隔离 + 崩溃恢复+8

从 Agent Loop 的 5 个工具到 Worktree Isolation 的 25 个工具,每一步都在 agent_loop 之外叠加能力。循环不变,Harness 叠加——这就是 Agent 系统工程的核心思想。

On this page