Harness实战:Worktree Isolation——让每个任务各干各的
上一篇Agent能自治循环了,但所有Agent共享一个工作目录——两个Agent同时改同一个文件就互相污染。这篇用git worktree给每个任务创建独立工作目录,加上TaskBoard任务看板和事件流,实现双平面架构(控制面+执行面)、双状态机联动、崩溃恢复。从17个工具扩展到25个,从"共享空间"进化为"各干各的"。
写在前面
上一篇我们让 Agent 从"等指令"进化为"自驱动"——任务看板 + WORK/IDLE 双阶段循环 + 自动认领。队友做完手头活,自己去看板找新任务,60 秒没活自动关机。
但有一个问题,从引入多 Agent 团队起就一直悬着——所有 Agent 共享同一个工作目录。
单 Agent 串行时没问题。但多个队友并行干活时,Agent A 在重构 config.py,Agent B 同时在给 config.py 加校验逻辑。两个 Agent 同时改同一个文件,A 的改动覆盖 B 的改动,B 的校验逻辑凭空消失。没有报错,没有冲突提示,静默数据丢失。
这就是控制面与执行面没有分离的后果。
这一篇,我们用 git worktree 给每个任务创建独立的工作目录,彻底解决并行执行的文件冲突。
共享目录 vs Worktree 隔离:差在哪?
之前的模式:
Agent A: 改 config.py (删30行,重构)
Agent B: 改 config.py (在第25行加15行校验)
→ 最终结果: B的版本覆盖A,A的重构完全丢失Worktree 隔离模式:
Lead: task_create("重构 config.py")
Lead: task_create("加校验逻辑")
Lead: worktree_create(task_id=1) ← 创建 .worktrees/task-1/
Lead: worktree_create(task_id=2) ← 创建 .worktrees/task-2/
Agent A: worktree_run(1, "vim config.py") ← 在 task-1 目录操作
Agent B: worktree_run(2, "vim config.py") ← 在 task-2 目录操作
→ 文件系统级隔离,不可能互相污染| 维度 | 共享目录 | Worktree 隔离 |
|---|---|---|
| 文件操作 | 所有 Agent 同目录 | 每个任务独立目录 |
| 冲突风险 | 静默覆盖 | 物理级隔离 |
| 状态管理 | 仅 Task 状态机 | Task + Worktree 双状态机 |
| 崩溃恢复 | 靠 tasks.json | 事件流 + 注册表 + 磁盘交叉比对 |
整体架构
基于 harness-agent-teams.py(904 行,17 个工具),目标是 harness-worktree-isolation.py(1320 行,25 个工具)。
代码执行逻辑流程图

流程图分为五个区域:
- REPL 主循环(左上):7 个状态查询命令 +
/plan+/compact+ 普通指令入口 - agent_loop(中部):三层压缩 → 收件箱注入 → LLM 调用 → 25 个工具分发 → 循环
- 子流程(右侧):run_subagent(同步 4 工具)、parallel_tasks(拓扑并行)、background_task(fire-and-forget)
- 队友循环(右下):独立 daemon 线程,8 个工具,收件箱驱动,支持优雅关机
- Worktree 生命周期(底部蓝色区域):create(双状态机联动)→ run(cwd 隔离执行)→ remove(收尾链),紫色节点标记 before/after 事件对
新增两个核心模块:
原有 harness-agent-teams.py (保留全部)
├── BASE_TOOLS (4): bash, read_file, write_file, edit_file
├── TEAM_COMM_TOOLS (2): send_message, read_inbox
├── MATE_TOOLS (2): shutdown_response, plan_approval
├── LEAD_TOOLS (13): compact, task, parallel_tasks, background_task, check_task,
│ spawn_teammate, list_teammates, broadcast,
│ shutdown_request, shutdown_status, plan_review
└── Plan Mode + Context Compact
+ 新增 (塞入 LEAD_TOOLS)
├── 任务看板 (2): task_create, task_list
└── Worktree 隔离 (6): worktree_create, worktree_remove, worktree_run,
worktree_list, worktree_keep, worktree_events工具分组沿用 harness-autonomous-agents.py 的 4 组风格:
| 分组 | 数量 | 工具名 |
|---|---|---|
| BASE_TOOLS | 4 | bash, read_file, write_file, edit_file |
| TEAM_COMM_TOOLS | 2 | send_message, read_inbox |
| MATE_TOOLS | 2 | shutdown_response, plan_approval |
| LEAD_TOOLS | 19 | compact, task, parallel_tasks, background_task, check_task, spawn_teammate, list_teammates, broadcast, shutdown_request, shutdown_status, plan_review, task_create, task_list, worktree_create, worktree_remove, worktree_run, worktree_list, worktree_keep, worktree_events |
按角色组合:
TEAMMATE_TOOLS = BASE_TOOLS + TEAM_COMM_TOOLS + MATE_TOOLS # 8
TOOLS = BASE_TOOLS + TEAM_COMM_TOOLS + LEAD_TOOLS # 25下面按改造步骤逐一拆解。
第一步:双平面架构
这是整个改造的设计基础。
项目根目录/
├── .tasks/ ← 控制面:调度和状态
│ ├── tasks.json ← 任务列表 + 状态 + worktree 绑定
│ └── events.jsonl ← 生命周期事件流
│
├── .worktrees/ ← 执行面:独立工作目录
│ ├── index.json ← worktree 注册表
│ ├── task-1/ ← 任务1的完整项目副本
│ └── task-2/ ← 任务2的完整项目副本
│
├── config.py ← 主目录不受影响
└── ...控制面(.tasks/):任务列表、依赖关系、事件日志。纯数据,不涉及文件操作。
执行面(.worktrees/):独立的工作目录。每个 worktree 是一个完整的项目副本,Agent 在里面自由操作。
注册表(index.json):记录每个 worktree 的元信息——关联的 task_id、分支名、路径、当前状态。
两个平面通过 task_id 关联,但物理上完全隔离。这个模式在分布式系统中无处不在——Kubernetes 的 etcd 管状态、Node 跑容器;数据库的 WAL 管一致性、数据页存数据。
第二步:TaskBoard——任务看板
之前的 harness-autonomous-agents.py 用每个任务一个 JSON 文件的方式管理看板。这次简化为一个 tasks.json 文件集中管理:
class TaskBoard:
"""任务看板: .tasks/tasks.json 管理任务全生命周期。
Task FSM:
[pending] → [in_progress] → [completed]
│
└→ [failed]
"""
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
self.tasks_path = self.dir / "tasks.json"
self.tasks: list[dict] = self._load()
def create(self, subject: str, description: str = "",
depends_on: list = None) -> str:
next_id = max((t["id"] for t in self.tasks), default=0) + 1
task = {
"id": next_id,
"subject": subject,
"description": description,
"status": "pending",
"owner": None,
"depends_on": depends_on or [],
"worktree": None, # ← 新增: worktree 绑定字段
}
self.tasks.append(task)
self._save()
return json.dumps(task, indent=2, ensure_ascii=False)和之前自治模式的 TaskBoard 相比,关键区别是多了一个 worktree 字段——记录任务绑定的 worktree 名称。这是控制面和执行面的连接点。
还有两个方法配合 WorktreeManager 使用:
def update_status(self, task_id: int | str, status: str) -> str:
"""WorktreeManager 调用:推进任务状态。"""
task = self.find(int(task_id))
if not task:
return f"Error: Task {task_id} not found"
task["status"] = status
self._save()
return f"Task #{task_id} status → {status}"
def bind_worktree(self, task_id: int, worktree_name: str):
"""WorktreeManager 调用:绑定 worktree 名称。"""
task = self.find(task_id)
if task:
task["worktree"] = worktree_name
self._save()列表展示加了 worktree 信息:
def list_all(self) -> str:
status_icon = {
"pending": " ", "in_progress": ">",
"completed": "x", "failed": "!",
}
lines = ["Task Board:"]
for t in self.tasks:
icon = status_icon.get(t["status"], "?")
owner = f" @{t['owner']}" if t.get("owner") else ""
wt = f" wt={t['worktree']}" if t.get("worktree") else ""
deps = f" (depends on: {t['depends_on']})" if t.get("depends_on") else ""
lines.append(f" [{icon}] #{t['id']}: {t['subject']}{owner}{wt}{deps}")
return "\n".join(lines)输出样例:
Task Board:
[>] #1: 重构 config.py wt=task-1
[>] #2: 加校验逻辑 wt=task-2
[ ] #3: 写集成测试 (depends on: [1, 2])第三步:WorktreeManager——核心引擎
这是整个改造最重要的类,约 210 行。管理 git worktree 的完整生命周期,与 TaskBoard 联动。
双状态机
Task FSM: Worktree FSM:
[pending] → [in_progress] → [completed] [absent] → [active] → [removed]
│ │
└→ [failed] └→ [kept]两个状态机通过绑定关系联动:
- 创建: worktree_create → Task pending→in_progress + Worktree absent→active
- 收尾: worktree_remove → Task in_progress→completed + Worktree active→removed
一个调用,两个状态机同时推进。
注册表
def _register(self, task_id: str, branch: str, path: str) -> None:
index = self._load_index()
key = f"task-{task_id}"
index[key] = {
"task_id": task_id,
"branch": branch,
"path": path,
"status": "active",
"created_at": datetime.now(timezone.utc).isoformat(),
}
self._save_index(index)注册表是 worktree 的"户口簿"。创建时注册,移除时注销。崩溃恢复时,对比注册表和磁盘实际状态,就能重建现场。
事件流
def _emit(self, event_type: str, **data) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"type": event_type,
**data,
}
with open(self._events_path(), "a") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")每个操作的 before/after 都写一条事件。JSONL 格式,一行一事件,追加写入,永不覆盖。
创建 Worktree
def worktree_create(self, task_id: str, branch: str | None = None) -> str:
"""创建 worktree 并绑定到任务,自动推进 Task pending→in_progress。"""
task = self.task_board.find(int(task_id))
if not task:
return f"Error: Task #{task_id} not found"
if task["status"] != "pending":
return f"Error: Task #{task_id} status is '{task['status']}', expected 'pending'"
branch = branch or f"task-{task_id}"
wt_path = self.worktrees_dir / f"task-{task_id}"
self._emit("worktree.create.before", task_id=task_id, branch=branch)
try:
# 1. 创建 git worktree
subprocess.run(
["git", "worktree", "add", str(wt_path), "-b", branch],
cwd=self.project_root, check=True,
capture_output=True, text=True,
)
# 2. 注册到 index.json
self._register(task_id, branch, str(wt_path))
# 3. 推进任务状态: pending → in_progress
self.task_board.update_status(task_id, "in_progress")
# 4. 绑定 worktree 名称到任务
self.task_board.bind_worktree(int(task_id), f"task-{task_id}")
self._emit("worktree.create.after", task_id=task_id, path=str(wt_path))
return json.dumps({
"task_id": task_id,
"branch": branch,
"path": str(wt_path),
"status": "active",
}, indent=2)
except subprocess.CalledProcessError as e:
self._emit("worktree.create.failed", task_id=task_id, error=e.stderr.strip())
return f"Error: {e.stderr.strip()}"注意两个前置检查:任务必须存在、状态必须是 pending。防止重复创建或对已完成的任务创建 worktree。
一个方法做四件事:创建 git worktree、注册到 index.json、推进任务状态、绑定名称。调用方只需要一行代码。
在 Worktree 中执行
def run_in_worktree(self, task_id: str, command: str) -> str:
"""在任务的 worktree 目录中执行命令。"""
wt_path = self._get_worktree_path(task_id)
if not wt_path or not wt_path.exists():
return f"Error: No active worktree for task #{task_id}"
try:
r = subprocess.run(
command, shell=True, cwd=wt_path, # ← cwd 切到 worktree 路径
capture_output=True, text=True, timeout=120,
)
out = (r.stdout + r.stderr).strip()
return out[:50000] if out else "(no output)"
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"关键就一行:cwd=wt_path。把命令的工作目录切到 worktree 路径。Agent A 在 .worktrees/task-1/ 里改文件,Agent B 在 .worktrees/task-2/ 里改文件,文件系统级隔离。
收尾:移除 Worktree
def worktree_remove(self, task_id: str, complete_task: bool = True) -> str:
"""拆除 worktree。complete_task=True 同时标记任务完成。"""
wt_path = self._get_worktree_path(task_id)
if not wt_path:
return f"Error: No worktree for task #{task_id}"
self._emit("worktree.remove.before", task_id=task_id, complete_task=complete_task)
# 1. 完成任务(可选)
if complete_task:
self.task_board.update_status(task_id, "completed")
self._emit("task.completed", task_id=task_id)
# 2. 移除 git worktree
if wt_path.exists():
subprocess.run(
["git", "worktree", "remove", str(wt_path), "--force"],
cwd=self.project_root, capture_output=True, text=True,
)
# 3. 清理分支
subprocess.run(
["git", "branch", "-d", f"task-{task_id}"],
cwd=self.project_root, capture_output=True, text=True,
)
# 4. 从注册表注销
self._unregister(task_id)
self._emit("worktree.remove.after", task_id=task_id)
return f"Removed worktree for task #{task_id}"一个 worktree_remove 触发完整的收尾链:完成任务 → 拆除目录 → 清理分支 → 注销注册 → 记录事件。
Keep:保留不拆
def worktree_keep(self, task_id: str) -> str:
"""标记 worktree 为 kept 状态,保留不拆,供调试。"""
index = self._load_index()
key = f"task-{task_id}"
if key not in index:
return f"Error: No worktree for task #{task_id}"
index[key]["status"] = "kept"
self._save_index(index)
self._emit("worktree.keep", task_id=task_id)
return json.dumps({"task_id": task_id, "status": "kept"})有时候任务出了问题,需要保留 worktree 供调试。worktree_keep 把状态从 active 改为 kept,这个 worktree 不会被自动清理。
第四步:崩溃恢复
这是事件流的核心价值。如果进程在创建 worktree 的过程中崩溃了——worktree.create.before 已写入但 worktree.create.after 还没写——怎么办?
def _find_incomplete_ops(self) -> list:
"""扫描事件流,找未闭合的 before 事件。"""
before_events = {}
for line in events_path.read_text().strip().split("\n"):
event = json.loads(line)
etype = event["type"]
tid = event.get("task_id", "")
if etype.endswith(".before"):
before_events[f"{etype}:{tid}"] = event
elif etype.endswith(".after") or etype.endswith(".failed"):
base = etype.rsplit(".", 1)[0]
before_events.pop(f"{base}.before:{tid}", None)
return list(before_events.values())逻辑很简洁:遍历所有事件,before 入栈,after 或 failed 出栈。最后栈里剩下的就是"开始了但没完成"的操作。
完整恢复流程:
def recover(self) -> str:
"""从 events + index.json + 磁盘交叉比对,恢复一致状态。"""
issues = {"orphaned_worktrees": [], "incomplete_ops": [], "recovered": []}
# 1. 扫描事件流,找未闭合的 before 事件
pending_ops = self._find_incomplete_ops()
for op in pending_ops:
if op["type"] == "worktree.create.before":
# 创建到一半崩溃了 → 清理残留目录
partial_path = self.worktrees_dir / f"task-{op['task_id']}"
if partial_path.exists():
subprocess.run(
["git", "worktree", "remove", str(partial_path), "--force"], ...
)
self._unregister(op["task_id"])
# 2. 对比 index.json 和磁盘实际状态
for wt_id, wt_info in self._load_index().items():
if not Path(wt_info["path"]).exists():
# 注册了但目录不存在 → 孤儿记录
self._unregister(wt_info["task_id"])
issues["orphaned_worktrees"].append(wt_id)
# 3. 对比磁盘和 index.json
for dir_path in self.worktrees_dir.iterdir():
if dir_path.is_dir() and dir_path.name not in index:
# 目录存在但没注册 → 孤儿目录
issues["orphaned_worktrees"].append(dir_path.name)
return json.dumps(issues, indent=2, ensure_ascii=False)三个数据源交叉比对:
- 事件流 有
before但没after→ 半完成操作,回滚 - index.json 有但磁盘没有 → 注册表残留,清理
- 磁盘有但 index.json 没有 → 孤儿目录,标记
这就是为什么需要事件流——没有 before/after 事件对,你无法区分"正常运行中"和"崩溃到一半"。
第五步:工具注册与 dispatch map
新增 8 个工具,直接塞进 LEAD_TOOLS 里——沿用 harness-autonomous-agents.py 的 4 组风格,只加注册,agent_loop 一行不改。
工具定义
8 个新工具全部加到 LEAD_TOOLS 末尾:
LEAD_TOOLS = [
# ... 原有 13 个工具 ...
# 任务看板 (+2)
{"name": "task_create", "description": "Create a task on the board.",
"input_schema": {"type": "object", "properties": {
"subject": {"type": "string"},
"description": {"type": "string"},
"depends_on": {"type": "array", "items": {"type": "integer"}}
}, "required": ["subject"]}},
{"name": "task_list", "description": "List all tasks with status.",
"input_schema": {"type": "object", "properties": {}}},
# worktree 隔离 (+6)
{"name": "worktree_create", ...}, # 创建并绑定
{"name": "worktree_remove", ...}, # 拆除并完成
{"name": "worktree_run", ...}, # 在隔离目录执行
{"name": "worktree_list", ...}, # 列出所有 worktree
{"name": "worktree_keep", ...}, # 标记保留
{"name": "worktree_events", ...}, # 查看事件日志
]Dispatch Map
TOOL_HANDLERS = {
**WORKSPACE_HANDLERS,
# ... 原有 handlers ...
# task board
"task_create": lambda **kw: BOARD.create(kw["subject"],
kw.get("description", ""), kw.get("depends_on")),
"task_list": lambda **kw: BOARD.list_all(),
# worktree isolation
"worktree_create": lambda **kw: WT.worktree_create(
kw["task_id"], kw.get("branch")),
"worktree_remove": lambda **kw: WT.worktree_remove(
kw["task_id"], kw.get("complete_task", True)),
"worktree_run": lambda **kw: WT.run_in_worktree(
kw["task_id"], kw["command"]),
"worktree_list": lambda **kw: WT.worktree_list(),
"worktree_keep": lambda **kw: WT.worktree_keep(kw["task_id"]),
"worktree_events": lambda **kw: WT.worktree_events(kw.get("limit", 30)),
}命名注意:任务看板用 task_create / task_list(不是 create_task),和子 Agent 的 task 工具区分开。一个是"贴到看板",一个是"立刻同步执行子任务"。都在同一个 LEAD_TOOLS 数组里,只靠注释分区。
第六步:REPL 新增命令
elif query.strip() == "/tasks":
print(BOARD.list_all())
continue
elif query.strip() == "/worktrees":
print(WT.worktree_list())
continue
elif query.strip() == "/events":
print(WT.worktree_events())
continue
elif query.strip() == "/recover":
print(WT.recover())
continue四个新命令,覆盖 worktree 隔离系统的全部状态:
/tasks— 看任务看板,含 worktree 绑定信息/worktrees— 看所有 worktree 状态(active/kept/removed)/events— 看生命周期事件流/recover— 手动触发崩溃恢复
配合已有的 /team、/inbox、/compact,七个调试命令覆盖了系统的全部可观测面。
第七步:提示词配套改动
System Prompt
新增 worktree 和 task board 工具说明:
SYSTEM = f"""You are a team lead agent at {WORKDIR}. Use tools to solve tasks.
...(原有内容不变)
Task board:
- `task_create`: Create a task on the board with optional dependencies.
- `task_list`: List all tasks with status.
Worktree isolation:
- `worktree_create`: Create an isolated git worktree bound to a task.
- `worktree_remove`: Remove a worktree, optionally completing the bound task.
- `worktree_run`: Run a command inside a worktree's directory.
- `worktree_list`: List all worktrees with status.
- `worktree_keep`: Mark a worktree as kept (for debugging).
- `worktree_events`: Show worktree lifecycle event log."""执行提示词
在 EXECUTE_PROMPT 中加一句:
"Use `worktree_create` to isolate file changes per task. "让模型在执行阶段知道什么时候该用 worktree 隔离而不是直接在主目录操作。
改造总结
| 维度 | harness-agent-teams.py | harness-worktree-isolation.py |
|---|---|---|
| 总行数 | 904 | 1320(+416) |
| Lead 工具 | 17 | 25(+8) |
| 队友工具 | 8 | 8(不变) |
| 新增类 | — | TaskBoard + WorktreeManager |
| 文件隔离 | 无 | git worktree 级隔离 |
| 状态管理 | 仅 Task 状态机 | Task + Worktree 双状态机 |
| 元信息管理 | config.json | + tasks.json + index.json |
| 生命周期日志 | 无 | events.jsonl 显式事件流 |
| 崩溃恢复 | 无 | 三源交叉比对 |
| REPL 命令 | /team, /inbox | + /tasks, /worktrees, /events, /recover |
新增改动分布:
| 步骤 | 内容 | 说明 |
|---|---|---|
| 1 | 双平面架构 | .tasks/ (控制面) + .worktrees/ (执行面) |
| 2 | TaskBoard 类(~70 行) | tasks.json 集中管理 + worktree 绑定字段 |
| 3 | WorktreeManager 类(~210 行) | 注册表 + 事件流 + 生命周期管理 |
| 4 | 崩溃恢复 | _find_incomplete_ops + recover |
| 5 | 工具注册(~40 行) | 8 个新工具定义 + dispatch map |
| 6 | REPL + 提示词(~30 行) | 4 个新命令 + prompt 更新 |
与 Claude Code 的对应
Claude Code 在 Agent tool 中原生支持 worktree 隔离模式:
- worktree_create ≈ Claude Code 的
Agent(isolation="worktree")——给子 Agent 创建临时 git worktree,在隔离副本中工作 - 双平面架构 ≈ Claude Code 的 worktree 管理机制——Agent 在独立目录工作,完成后自动清理(无更改时)或保留(有更改时返回路径和分支)
- events.jsonl ≈ Claude Code 的 Agent 返回结果中的 worktree 信息(
worktree path and branch are returned in the result) - 崩溃恢复 ≈ Claude Code 的自动清理(
The worktree is automatically cleaned up if the agent makes no changes)
底层思路一致:每个并行任务一个独立的文件系统空间,互不干扰,用完清理。
下一步
到这里,Harness 实战系列已经走过了完整的 Agent 系统核心能力:
| 篇目 | 能力 | 新增工具 |
|---|---|---|
| Agent Loop | 基础循环 | 4 |
| Tool Use + Plan Mode | 工具注册 + 规划 | +1 |
| Subagent | 子任务委派 | +1 |
| LLM API | 模型能力 | — |
| Parallel Tasks | 拓扑排序并行 | +1 |
| Background Tasks | 异步后台 | +2 |
| Context Compact | 三层压缩 | — |
| Agent Teams | 团队通信与协议 | +8 |
| Autonomous Agents | 自治循环与任务看板 | +5 |
| Worktree Isolation | 双平面隔离 + 崩溃恢复 | +8 |
从 Agent Loop 的 5 个工具到 Worktree Isolation 的 25 个工具,每一步都在 agent_loop 之外叠加能力。循环不变,Harness 叠加——这就是 Agent 系统工程的核心思想。