第12课:终极隔离——Worktree并行执行
到s11所有任务共享一个目录,两个Agent同时改同一个文件就互相污染。Worktree给每个任务一个独立的git目录,控制面与执行面彻底分离。
系列导读
这是**《12课拆解Claude Code架构》系列的第 12 课,也是终极课**。
从第 1 课的 20 行 while 循环出发,我们一路叠加了工具注册、规划系统、子 Agent、技能加载、上下文压缩、任务持久化、后台执行、Agent 团队、协作协议、自治循环。到第 11 课,Agent 已经能自己拆任务、分配角色、自主协作。
但有一个问题,从第 7 课引入 TaskManager 起就一直悬着——所有任务共享同一个工作目录。
第 12 课的格言:
"各干各的目录, 互不干扰"
这一课,我们给每个任务一个独立的 git worktree,彻底解决并行执行的文件冲突问题。循环一行不改。
共享目录的灾难
前 11 课的所有任务都在同一个项目目录下执行。单 Agent 串行的时候没问题——每个时刻只有一个 Agent 在操作文件。
但第 9-11 课引入了多 Agent 团队和自治循环。想象这个场景:
Agent A 负责重构 config.py——把单体配置拆成模块化结构。
Agent B 负责给 config.py 加上环境变量校验逻辑。
两个 Agent 同时改同一个文件。A 删了 30 行,B 在第 25 行加了 15 行。谁都没提交。
结果:A 的改动覆盖了 B 的改动,B 的校验逻辑凭空消失。没有报错,没有冲突提示,静默数据丢失。
共享目录的灾难:
Agent A (重构 config.py) Agent B (加校验逻辑)
| |
| 读取 config.py (100行) | 读取 config.py (100行)
| |
| 删除 30行,重构为模块 | 在第25行加15行校验
| |
| 写回 config.py (70行) | 写回 config.py (115行)
| |
v v
最终 config.py = 115行 (B的版本)
A的重构完全丢失,且无人知晓TaskManager 管"做什么"——哪些任务 pending、哪些 in_progress、哪些 completed。但它不管"在哪做"。所有 Agent 都在同一个目录里操作,文件系统层面毫无隔离。
这就是控制面与执行面没有分离的后果。

解决思路:git worktree
Git 原生提供了一个完美的隔离机制——worktree。
git worktree add 能在同一个仓库下创建多个独立的工作目录,每个 worktree 有自己的文件系统、自己的 HEAD、自己的暂存区,但共享同一个 .git 对象库。
# 创建一个独立的工作目录
git worktree add .worktrees/task-42 -b task-42
# 现在 .worktrees/task-42/ 是一个完整的项目副本
# 在里面改文件,不影响主目录把它和 TaskManager 结合:每个任务创建一个 worktree,Agent 在自己的 worktree 里干活,干完了合并回来,再拆掉 worktree。
控制面(.tasks/)管调度,执行面(.worktrees/)管干活。两个平面,互不干扰。
双平面架构
项目根目录/
├── .tasks/ ← 控制面:调度和状态
│ ├── tasks.json ← 任务列表 + 状态 + worktree绑定
│ └── events.jsonl ← 生命周期事件流
│
├── .worktrees/ ← 执行面:独立工作目录
│ ├── task-42/ ← 任务42的完整项目副本
│ │ ├── config.py ← Agent A 在这里改
│ │ └── ...
│ ├── task-43/ ← 任务43的完整项目副本
│ │ ├── config.py ← Agent B 在这里改
│ │ └── ...
│ └── index.json ← worktree 注册表
│
├── config.py ← 主目录不受影响
└── ...控制面(.tasks/):存任务状态、依赖关系、事件日志。纯数据,不涉及文件操作。
执行面(.worktrees/):存独立的工作目录。每个 worktree 是一个完整的项目副本,Agent 在里面自由操作。
注册表(index.json):记录每个 worktree 的元信息——关联的 task_id、创建时间、分支名、当前状态。
两个平面通过 task_id 关联,但物理上完全隔离。
核心拆解
双状态机
系统需要管理两种状态,它们有各自独立的生命周期:
Task 状态机(沿用 s07,扩展绑定):
pending ──→ in_progress ──→ completed
│
└──→ failedWorktree 状态机(新增):
absent ──→ active ──→ removed
│
└──→ kept (保留不拆,供调试)两个状态机通过绑定关系联动:
- 创建 worktree 时传入
task_id→ Task 自动从pending推进到in_progress - 移除 worktree 时设置
complete_task=True→ Task 自动从in_progress推进到completed
一个调用,两个状态机同时推进。
Worktree 注册表(index.json)
{
"worktrees": {
"task-42": {
"task_id": "42",
"branch": "task-42",
"path": ".worktrees/task-42",
"status": "active",
"created_at": "2025-01-15T10:30:00Z"
},
"task-43": {
"task_id": "43",
"branch": "task-43",
"path": ".worktrees/task-43",
"status": "active",
"created_at": "2025-01-15T10:31:00Z"
}
}
}注册表是 worktree 的"户口簿"。创建时注册,移除时注销。崩溃恢复时,对比注册表和磁盘实际状态,就能重建现场。
事件流(events.jsonl)
{"ts":"2025-01-15T10:30:00Z","type":"worktree.create.before","task_id":"42","branch":"task-42"}
{"ts":"2025-01-15T10:30:01Z","type":"worktree.create.after","task_id":"42","path":".worktrees/task-42"}
{"ts":"2025-01-15T10:45:00Z","type":"worktree.remove.before","task_id":"42","complete_task":true}
{"ts":"2025-01-15T10:45:01Z","type":"task.completed","task_id":"42"}
{"ts":"2025-01-15T10:45:02Z","type":"worktree.remove.after","task_id":"42"}每个操作的 before/after 都写一条事件。JSONL 格式,一行一事件,追加写入,永不覆盖。
为什么要事件流?
- 崩溃恢复:如果在
worktree.create.before之后、worktree.create.after之前崩溃了,重启时能发现一个"开始了但没完成"的操作,自动清理。 - 可审计:每个任务的完整生命周期都有据可查。
- 可扩展:未来加监控、加统计、加 webhook 通知,只需要消费事件流,不改核心逻辑。
事件类型一览:
| 事件 | 触发时机 |
|---|---|
worktree.create.before | 即将创建 worktree |
worktree.create.after | worktree 创建成功 |
worktree.create.failed | worktree 创建失败 |
worktree.remove.before | 即将移除 worktree |
worktree.remove.after | worktree 移除完成 |
task.completed | 任务标记完成 |
创建与绑定
def worktree_create(self, task_id: str, branch: str | None = None) -> str:
"""创建worktree并绑定到任务,自动推进任务到in_progress"""
branch = branch or f"task-{task_id}"
wt_path = self.worktrees_dir / f"task-{task_id}"
# 发射前置事件
self._emit("worktree.create.before", task_id=task_id, branch=branch)
try:
# 1. 创建 git worktree
subprocess.run(
["git", "worktree", "add", str(wt_path), "-b", branch],
cwd=self.project_root, check=True,
capture_output=True, text=True,
)
# 2. 注册到 index.json
self._register_worktree(task_id, branch, str(wt_path))
# 3. 推进任务状态:pending → in_progress
self.task_manager.update_status(task_id, "in_progress")
# 发射后置事件
self._emit("worktree.create.after", task_id=task_id, path=str(wt_path))
return str(wt_path)
except subprocess.CalledProcessError as e:
self._emit("worktree.create.failed", task_id=task_id, error=str(e))
raise一个方法做三件事:创建目录、注册元信息、推进任务状态。调用方只需要一行代码。
执行
Agent 拿到 worktree 路径后,所有工具调用的 cwd 都切到这个路径:
def run_in_worktree(self, task_id: str, command: str) -> str:
"""在任务的worktree目录中执行命令"""
wt_path = self._get_worktree_path(task_id)
if not wt_path:
raise ValueError(f"No worktree for task {task_id}")
result = subprocess.run(
command, shell=True, cwd=wt_path,
capture_output=True, text=True, timeout=120,
)
return (result.stdout + result.stderr).strip() or "(no output)"Agent A 在 .worktrees/task-42/ 里改 config.py,Agent B 在 .worktrees/task-43/ 里改 config.py。文件系统级隔离,不可能互相污染。
收尾
def worktree_remove(self, task_id: str, complete_task: bool = True) -> None:
"""拆除worktree,可选同时完成任务"""
self._emit("worktree.remove.before", task_id=task_id, complete_task=complete_task)
# 1. 完成任务(可选)
if complete_task:
self.task_manager.update_status(task_id, "completed")
self._emit("task.completed", task_id=task_id)
# 2. 移除 git worktree
wt_path = self._get_worktree_path(task_id)
subprocess.run(
["git", "worktree", "remove", str(wt_path), "--force"],
cwd=self.project_root, check=True,
capture_output=True, text=True,
)
# 3. 从注册表注销
self._unregister_worktree(task_id)
# 4. 清理分支(可选)
branch = f"task-{task_id}"
subprocess.run(
["git", "branch", "-d", branch],
cwd=self.project_root,
capture_output=True, text=True,
)
self._emit("worktree.remove.after", task_id=task_id)worktree_remove(task_id, complete_task=True) 一个调用搞定四件事:完成任务、拆除目录、注销注册、清理分支。
如果需要保留 worktree 供调试,传 complete_task=False,worktree 状态改为 kept 而非 removed。
崩溃恢复
def recover(self) -> dict:
"""从 .tasks/ + index.json 重建现场"""
issues = {"orphaned_worktrees": [], "incomplete_ops": [], "recovered": []}
# 1. 扫描事件流,找未闭合的 before 事件
pending_ops = self._find_incomplete_ops()
for op in pending_ops:
if op["type"] == "worktree.create.before":
# 创建到一半崩溃了 → 清理残留目录
self._cleanup_partial_create(op["task_id"])
issues["incomplete_ops"].append(op)
# 2. 对比 index.json 和磁盘实际状态
for wt_id, wt_info in self._load_index().items():
wt_path = Path(wt_info["path"])
if not wt_path.exists():
# 注册了但目录不存在 → 孤儿记录
self._unregister_worktree(wt_id)
issues["orphaned_worktrees"].append(wt_id)
else:
issues["recovered"].append(wt_id)
# 3. 对比磁盘和 index.json
for dir_path in self.worktrees_dir.iterdir():
if dir_path.is_dir() and dir_path.name not in self._load_index():
# 目录存在但没注册 → 孤儿目录
issues["orphaned_worktrees"].append(dir_path.name)
return issues恢复逻辑的核心思路:两个数据源交叉比对。
index.json有但磁盘没有 → 注册表残留,清理- 磁盘有但
index.json没有 → 孤儿目录,标记 - 事件流有
before但没after→ 半完成操作,回滚
这就是为什么需要事件流——没有 before/after 事件对,你无法区分"正常运行中"和"崩溃到一半"。

完整代码
"""s12 — Worktree + Task Isolation: 终极隔离"""
import json
import subprocess
import time
from datetime import datetime, timezone
from pathlib import Path
class WorktreeManager:
"""管理 git worktree 生命周期,与 TaskManager 联动。"""
def __init__(self, project_root: str, task_manager):
self.project_root = Path(project_root)
self.worktrees_dir = self.project_root / ".worktrees"
self.tasks_dir = self.project_root / ".tasks"
self.task_manager = task_manager
self.worktrees_dir.mkdir(exist_ok=True)
self.tasks_dir.mkdir(exist_ok=True)
# ── 注册表 ──────────────────────────────────────
def _index_path(self) -> Path:
return self.worktrees_dir / "index.json"
def _load_index(self) -> dict:
path = self._index_path()
if path.exists():
return json.loads(path.read_text())["worktrees"]
return {}
def _save_index(self, worktrees: dict) -> None:
self._index_path().write_text(
json.dumps({"worktrees": worktrees}, indent=2, ensure_ascii=False)
)
def _register_worktree(self, task_id: str, branch: str, path: str) -> None:
index = self._load_index()
index[f"task-{task_id}"] = {
"task_id": task_id,
"branch": branch,
"path": path,
"status": "active",
"created_at": datetime.now(timezone.utc).isoformat(),
}
self._save_index(index)
def _unregister_worktree(self, task_id: str) -> None:
index = self._load_index()
key = f"task-{task_id}"
if key in index:
del index[key]
self._save_index(index)
def _get_worktree_path(self, task_id: str) -> Path | None:
index = self._load_index()
info = index.get(f"task-{task_id}")
return Path(info["path"]) if info else None
# ── 事件流 ──────────────────────────────────────
def _events_path(self) -> Path:
return self.tasks_dir / "events.jsonl"
def _emit(self, event_type: str, **data) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"type": event_type,
**data,
}
with open(self._events_path(), "a") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
# ── Worktree 生命周期 ───────────────────────────
def worktree_create(self, task_id: str, branch: str | None = None) -> str:
branch = branch or f"task-{task_id}"
wt_path = self.worktrees_dir / f"task-{task_id}"
self._emit("worktree.create.before", task_id=task_id, branch=branch)
try:
subprocess.run(
["git", "worktree", "add", str(wt_path), "-b", branch],
cwd=self.project_root,
check=True,
capture_output=True,
text=True,
)
self._register_worktree(task_id, branch, str(wt_path))
self.task_manager.update_status(task_id, "in_progress")
self._emit("worktree.create.after", task_id=task_id, path=str(wt_path))
return str(wt_path)
except subprocess.CalledProcessError as e:
self._emit(
"worktree.create.failed", task_id=task_id, error=str(e.stderr)
)
raise
def worktree_remove(self, task_id: str, complete_task: bool = True) -> None:
self._emit(
"worktree.remove.before", task_id=task_id, complete_task=complete_task
)
if complete_task:
self.task_manager.update_status(task_id, "completed")
self._emit("task.completed", task_id=task_id)
wt_path = self._get_worktree_path(task_id)
if wt_path and wt_path.exists():
subprocess.run(
["git", "worktree", "remove", str(wt_path), "--force"],
cwd=self.project_root,
check=True,
capture_output=True,
text=True,
)
branch = f"task-{task_id}"
subprocess.run(
["git", "branch", "-d", branch],
cwd=self.project_root,
capture_output=True,
text=True,
)
self._unregister_worktree(task_id)
self._emit("worktree.remove.after", task_id=task_id)
# ── 执行 ────────────────────────────────────────
def run_in_worktree(self, task_id: str, command: str) -> str:
wt_path = self._get_worktree_path(task_id)
if not wt_path or not wt_path.exists():
raise ValueError(f"No active worktree for task {task_id}")
result = subprocess.run(
command,
shell=True,
cwd=wt_path,
capture_output=True,
text=True,
timeout=120,
)
out = (result.stdout + result.stderr).strip()
return out[:50000] if out else "(no output)"
# ── 崩溃恢复 ────────────────────────────────────
def _find_incomplete_ops(self) -> list:
events_path = self._events_path()
if not events_path.exists():
return []
before_events = {}
for line in events_path.read_text().strip().split("\n"):
if not line:
continue
event = json.loads(line)
etype = event["type"]
tid = event.get("task_id", "")
if etype.endswith(".before"):
before_events[f"{etype}:{tid}"] = event
elif etype.endswith(".after") or etype.endswith(".failed"):
base = etype.rsplit(".", 1)[0]
before_events.pop(f"{base}.before:{tid}", None)
return list(before_events.values())
def recover(self) -> dict:
issues = {"orphaned_worktrees": [], "incomplete_ops": [], "recovered": []}
pending_ops = self._find_incomplete_ops()
for op in pending_ops:
issues["incomplete_ops"].append(op)
if op["type"] == "worktree.create.before":
tid = op["task_id"]
partial_path = self.worktrees_dir / f"task-{tid}"
if partial_path.exists():
subprocess.run(
["git", "worktree", "remove", str(partial_path), "--force"],
cwd=self.project_root,
capture_output=True,
text=True,
)
self._unregister_worktree(tid)
index = self._load_index()
for wt_id, wt_info in list(index.items()):
wt_path = Path(wt_info["path"])
if not wt_path.exists():
self._unregister_worktree(wt_info["task_id"])
issues["orphaned_worktrees"].append(wt_id)
else:
issues["recovered"].append(wt_id)
for dir_path in self.worktrees_dir.iterdir():
if dir_path.is_dir() and dir_path.name not in index:
issues["orphaned_worktrees"].append(dir_path.name)
return issues
# ── 查询 ────────────────────────────────────────
def list_active(self) -> list:
return [
info
for info in self._load_index().values()
if info["status"] == "active"
]代码量约 180 行。加上已有的 TaskManager(~100 行),整个任务+隔离系统不到 300 行。
12 课回顾:从 20 行到完整 Harness
这是终极课。让我们回头看看整个旅程:
| 课程 | 机制 | 核心概念 |
|---|---|---|
| s01 | Agent Loop | while 循环 + stop_reason |
| s02 | Tool Use | dispatch map,注册即扩展 |
| s03 | TodoWrite | 显式规划,扁平待办清单 |
| s04 | Subagent | 隔离上下文,独立消息列表 |
| s05 | Skill Loading | 动态 system prompt 注入 |
| s06 | Context Compact | 压缩旧消息,保持窗口可用 |
| s07 | Task System | 持久化 DAG,依赖解锁 |
| s08 | Background Tasks | 守护线程 + 通知队列 |
| s09 | Agent Teams | 多 Agent 接力协作 |
| s10 | Team Protocols | 握手协议 + 共享黑板 |
| s11 | Autonomous Loop | 自治循环 + 终止条件 |
| s12 | Worktree Isolation | 双平面架构 + 双状态机 |
12 个课程,12 个递进的机制。但有一个东西从头到尾没变过:
while True:
response = client.messages.create(...)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
# 执行工具,收集结果
messages.append({"role": "user", "content": results})这个循环,从第 1 课到第 12 课,一行不改。
所有 12 个机制——工具注册、规划、子 Agent、技能、压缩、持久化、后台、团队、协议、自治、隔离——全部都在循环之外叠加。循环只管三件事:调 LLM、判退出、执行工具。
这就是 Harness 工程的核心理念:循环不变,Harness 叠加。
洞见
控制面与执行面分离
这个架构模式在分布式系统中无处不在:
- Kubernetes:etcd(控制面)管状态,Node(执行面)跑容器
- 数据库:WAL/事务日志(控制面)管一致性,数据页(执行面)存数据
- 本课:
.tasks/(控制面)管调度,.worktrees/(执行面)管执行
分离的好处是独立故障恢复。执行面崩了(worktree 目录损坏),控制面还在(tasks.json 完好),可以重建。控制面更新了(任务完成),执行面可以延迟清理(worktree 保留供调试)。
事件流的可扩展性
events.jsonl 看起来只是个日志文件,但它是整个系统最有扩展价值的组件。
当前用途:崩溃恢复。
未来可以做的事,不改一行核心代码:
- 监控仪表盘:消费事件流,实时展示任务进度
- 性能分析:统计每个任务的 worktree 存活时长
- Webhook 通知:任务完成时推送到 Slack
- 审计日志:合规场景下追溯每个操作
这就是事件溯源模式的威力:把"发生了什么"完整记录,让"怎么用这些信息"成为后续扩展的事。
双状态机的优雅
Task 和 Worktree 各有自己的生命周期,但通过绑定关系联动。这比单一状态机优雅得多——你不需要一个巨大的状态枚举来覆盖所有组合(task_pending_worktree_absent、task_in_progress_worktree_active……),而是两个小状态机各管各的,联动点只有两个:创建和收尾。
五分钟跑起来
# 进入项目目录
cd learn-claude-code
# 启动终极课
python agents/s12_worktree_task_isolation.py任务一:创建任务 + 绑定 Worktree
先创建任务,再为每个任务创建隔离的工作空间:
s12 >> Create tasks for backend auth and frontend login page, then list tasks.实际执行记录:
> task_create: {"id": 6, "subject": "Backend Auth", "status": "pending"}
> task_create: {"id": 7, "subject": "Frontend Login Page", "status": "pending"}
> task_list:
[x] #1-#5: 前几课的任务(已完成)
[ ] #6: Backend Auth
[ ] #7: Frontend Login Page然后为任务绑定独立的 worktree:
s12 >> Create worktree "auth-refactor" for task 1,
then bind task 2 to a new worktree "ui-login".> worktree_create:
name: auth-refactor
path: .worktrees/auth-refactor
branch: wt/auth-refactor ← 自动创建 git 分支
task_id: 1
status: active
> worktree_create:
name: ui-login
path: .worktrees/ui-login
branch: wt/ui-login
task_id: 2
status: active关键观察: 每个 worktree 自动创建独立 git 分支(wt/auth-refactor、wt/ui-login),绑定到具体 task_id。两个任务在不同目录操作同一份代码库的副本——物理级隔离,互不干扰。
任务二:在隔离环境中执行命令
在指定 worktree 里运行命令,验证隔离效果:
s12 >> Run "git status --short" in worktree "auth-refactor".> worktree_run: (no output) ← 干净的工作区,没有任何修改工作区是全新的——从 HEAD 检出的独立副本。在这里做的任何修改都不会影响主目录和其他 worktree。
任务三:Worktree 生命周期管理
标记 worktree 为保留状态,查看完整事件流:
s12 >> Keep worktree "ui-login", then list worktrees and inspect events.实际执行记录:
> worktree_keep: {"name": "ui-login", "status": "kept"}
> worktree_list:
[active] auth-refactor → .worktrees/auth-refactor (wt/auth-refactor) task=1
[kept] ui-login → .worktrees/ui-login (wt/ui-login) task=2
> worktree_events:
1. worktree.create.before — auth-refactor 创建开始
2. worktree.create.after — auth-refactor 创建完成
3. worktree.create.before — ui-login 创建开始
4. worktree.create.after — ui-login 创建完成
5. worktree.keep — ui-login 标记为保留 📌events.jsonl 记录了完整的生命周期——每个 worktree 的创建、保留、删除都有时间戳和上下文。崩溃恢复时可以从事件流重建状态。
任务四:删除 Worktree + 自动完成任务
删除 worktree 并自动标记关联任务为完成:
s12 >> Remove worktree "auth-refactor" with complete_task=true,
then list tasks/worktrees/events.实际执行记录:
> worktree_remove: Removed worktree 'auth-refactor'
> task_list:
[x] #1: Design the data schema owner=frank ← 自动标记完成!
[x] #2: Build the backend API layer owner=eve wt=ui-login
> worktree_list:
[removed] auth-refactor (wt/auth-refactor) task=1
[kept] ui-login (wt/ui-login) task=2
> worktree_events:
6. worktree.remove.before — auth-refactor 删除开始
7. task.completed — Task #1 自动标记完成 ✅
8. worktree.remove.after — auth-refactor 完全清除 🗑️一个 worktree_remove 触发了完整的收尾链: 拆除工作目录 → 标记任务完成 → 记录事件。这就是 Worktree 和 Task 双状态机联动的价值——不需要手动维护两边的一致性。
完整变更表(终极版)
| 组件 | 之前(s11) | 之后(s12) |
|---|---|---|
| 工作目录 | 所有任务共享一个 | 每个任务独立 worktree |
| 任务状态 | 仅 Task 状态机 | Task + Worktree 双状态机 |
| 文件隔离 | 无 | git worktree 级隔离 |
| 元信息管理 | tasks.json | tasks.json + index.json |
| 生命周期日志 | 隐式/无 | events.jsonl 显式事件流 |
| 崩溃恢复 | 从 tasks.json 恢复 | tasks.json + index.json + events 交叉恢复 |
| 创建流程 | 创建任务 | 创建任务 + 创建 worktree + 绑定 |
| 收尾流程 | 标记完成 | 标记完成 + 拆除 worktree + 注销 + 清理分支 |
| 新增代码 | — | ~180 行(WorktreeManager) |
| 累计代码 | ~650 行 | ~830 行 |
系列总结:从 20 行到 830 行
12 课走完,让我们看看全局:
第一阶段(s01-s03):最小 Agent
- while 循环 + 工具 + 规划
- 一个 Agent 能干活了
第二阶段(s04-s06):能力扩展
- 子 Agent + 技能 + 压缩
- Agent 能应对复杂任务了
第三阶段(s07-s08):持久化与并发
- 任务图 + 后台执行
- Agent 能断点续跑、不阻塞了
第四阶段(s09-s12):多 Agent 系统
- 团队 + 协议 + 自治 + 隔离
- Agent 能组队、自治、互不干扰了
从 20 行到 830 行,代码量增加了约 40 倍。但核心循环——那个 while True 加 stop_reason 判断——一行不改。
所有 830 行代码,都在做同一件事:让那 20 行循环跑得更好、更安全、更强大。
这就是 Agent Harness 工程的本质。不是发明新的 AI 能力,而是给已有的 AI 能力搭建基础设施——工具、规划、隔离、协作、持久化、恢复。循环是发动机,Harness 是整辆车。

这是《12课拆解Claude Code架构:从零掌握Agent Harness工程》系列的第 12 课,也是终极课。感谢你一路读到这里。
完整代码和交互式学习平台:github.com/shareAI-lab/learn-claude-code
如果这个系列对你有帮助,欢迎转发给你的技术团队。
系列目录
- 第1课:用20行Python造出你的第一个AI Agent
- 第2课:给Agent加工具 —— dispatch map模式详解
- 第3课:TodoWrite —— 让Agent先想后做:规划系统
- 第4课:Subagent —— 拆解大任务,上下文隔离
- 第5课:按需加载领域知识——Skill机制
- 第6课:无限对话——上下文压缩三层策略
- 第7课:任务持久化——文件级DAG任务图
- 第8课:后台执行——异步任务与通知队列
- 第9课:Agent Teams——多Agent协作:团队与邮箱系统
- 第10课:团队协议——状态机驱动的协商
- 第11课:自治Agent——自组织任务认领
- 第12课:终极隔离——Worktree并行执行(本文)