HuanCode Docs

第12课:终极隔离——Worktree并行执行

到s11所有任务共享一个目录,两个Agent同时改同一个文件就互相污染。Worktree给每个任务一个独立的git目录,控制面与执行面彻底分离。

系列导读

这是**《12课拆解Claude Code架构》系列的第 12 课,也是终极课**。

从第 1 课的 20 行 while 循环出发,我们一路叠加了工具注册、规划系统、子 Agent、技能加载、上下文压缩、任务持久化、后台执行、Agent 团队、协作协议、自治循环。到第 11 课,Agent 已经能自己拆任务、分配角色、自主协作。

但有一个问题,从第 7 课引入 TaskManager 起就一直悬着——所有任务共享同一个工作目录。

第 12 课的格言:

"各干各的目录, 互不干扰"

这一课,我们给每个任务一个独立的 git worktree,彻底解决并行执行的文件冲突问题。循环一行不改。


共享目录的灾难

前 11 课的所有任务都在同一个项目目录下执行。单 Agent 串行的时候没问题——每个时刻只有一个 Agent 在操作文件。

但第 9-11 课引入了多 Agent 团队和自治循环。想象这个场景:

Agent A 负责重构 config.py——把单体配置拆成模块化结构。 Agent B 负责给 config.py 加上环境变量校验逻辑。

两个 Agent 同时改同一个文件。A 删了 30 行,B 在第 25 行加了 15 行。谁都没提交。

结果:A 的改动覆盖了 B 的改动,B 的校验逻辑凭空消失。没有报错,没有冲突提示,静默数据丢失。

共享目录的灾难:

  Agent A (重构 config.py)        Agent B (加校验逻辑)
       |                               |
       | 读取 config.py (100行)         | 读取 config.py (100行)
       |                               |
       | 删除 30行,重构为模块          | 在第25行加15行校验
       |                               |
       | 写回 config.py (70行)          | 写回 config.py (115行)
       |                               |
       v                               v
  最终 config.py = 115行 (B的版本)
  A的重构完全丢失,且无人知晓

TaskManager 管"做什么"——哪些任务 pending、哪些 in_progress、哪些 completed。但它不管"在哪做"。所有 Agent 都在同一个目录里操作,文件系统层面毫无隔离。

这就是控制面与执行面没有分离的后果。

共享目录的灾难:两个Agent改同一个文件

解决思路:git worktree

Git 原生提供了一个完美的隔离机制——worktree

git worktree add 能在同一个仓库下创建多个独立的工作目录,每个 worktree 有自己的文件系统、自己的 HEAD、自己的暂存区,但共享同一个 .git 对象库。

# 创建一个独立的工作目录
git worktree add .worktrees/task-42 -b task-42

# 现在 .worktrees/task-42/ 是一个完整的项目副本
# 在里面改文件,不影响主目录

把它和 TaskManager 结合:每个任务创建一个 worktree,Agent 在自己的 worktree 里干活,干完了合并回来,再拆掉 worktree。

控制面(.tasks/)管调度,执行面(.worktrees/)管干活。两个平面,互不干扰。

双平面架构

项目根目录/
├── .tasks/                    ← 控制面:调度和状态
│   ├── tasks.json             ← 任务列表 + 状态 + worktree绑定
│   └── events.jsonl           ← 生命周期事件流

├── .worktrees/                ← 执行面:独立工作目录
│   ├── task-42/               ← 任务42的完整项目副本
│   │   ├── config.py          ← Agent A 在这里改
│   │   └── ...
│   ├── task-43/               ← 任务43的完整项目副本
│   │   ├── config.py          ← Agent B 在这里改
│   │   └── ...
│   └── index.json             ← worktree 注册表

├── config.py                  ← 主目录不受影响
└── ...

控制面.tasks/):存任务状态、依赖关系、事件日志。纯数据,不涉及文件操作。

执行面.worktrees/):存独立的工作目录。每个 worktree 是一个完整的项目副本,Agent 在里面自由操作。

注册表index.json):记录每个 worktree 的元信息——关联的 task_id、创建时间、分支名、当前状态。

两个平面通过 task_id 关联,但物理上完全隔离。

核心拆解

双状态机

系统需要管理两种状态,它们有各自独立的生命周期:

Task 状态机(沿用 s07,扩展绑定):

pending ──→ in_progress ──→ completed

                └──→ failed

Worktree 状态机(新增):

absent ──→ active ──→ removed

              └──→ kept (保留不拆,供调试)

两个状态机通过绑定关系联动:

  • 创建 worktree 时传入 task_id → Task 自动从 pending 推进到 in_progress
  • 移除 worktree 时设置 complete_task=True → Task 自动从 in_progress 推进到 completed

一个调用,两个状态机同时推进。

Worktree 注册表(index.json)

{
  "worktrees": {
    "task-42": {
      "task_id": "42",
      "branch": "task-42",
      "path": ".worktrees/task-42",
      "status": "active",
      "created_at": "2025-01-15T10:30:00Z"
    },
    "task-43": {
      "task_id": "43",
      "branch": "task-43",
      "path": ".worktrees/task-43",
      "status": "active",
      "created_at": "2025-01-15T10:31:00Z"
    }
  }
}

注册表是 worktree 的"户口簿"。创建时注册,移除时注销。崩溃恢复时,对比注册表和磁盘实际状态,就能重建现场。

事件流(events.jsonl)

{"ts":"2025-01-15T10:30:00Z","type":"worktree.create.before","task_id":"42","branch":"task-42"}
{"ts":"2025-01-15T10:30:01Z","type":"worktree.create.after","task_id":"42","path":".worktrees/task-42"}
{"ts":"2025-01-15T10:45:00Z","type":"worktree.remove.before","task_id":"42","complete_task":true}
{"ts":"2025-01-15T10:45:01Z","type":"task.completed","task_id":"42"}
{"ts":"2025-01-15T10:45:02Z","type":"worktree.remove.after","task_id":"42"}

每个操作的 before/after 都写一条事件。JSONL 格式,一行一事件,追加写入,永不覆盖。

为什么要事件流?

  1. 崩溃恢复:如果在 worktree.create.before 之后、worktree.create.after 之前崩溃了,重启时能发现一个"开始了但没完成"的操作,自动清理。
  2. 可审计:每个任务的完整生命周期都有据可查。
  3. 可扩展:未来加监控、加统计、加 webhook 通知,只需要消费事件流,不改核心逻辑。

事件类型一览:

事件触发时机
worktree.create.before即将创建 worktree
worktree.create.afterworktree 创建成功
worktree.create.failedworktree 创建失败
worktree.remove.before即将移除 worktree
worktree.remove.afterworktree 移除完成
task.completed任务标记完成

创建与绑定

def worktree_create(self, task_id: str, branch: str | None = None) -> str:
    """创建worktree并绑定到任务,自动推进任务到in_progress"""
    branch = branch or f"task-{task_id}"
    wt_path = self.worktrees_dir / f"task-{task_id}"

    # 发射前置事件
    self._emit("worktree.create.before", task_id=task_id, branch=branch)

    try:
        # 1. 创建 git worktree
        subprocess.run(
            ["git", "worktree", "add", str(wt_path), "-b", branch],
            cwd=self.project_root, check=True,
            capture_output=True, text=True,
        )

        # 2. 注册到 index.json
        self._register_worktree(task_id, branch, str(wt_path))

        # 3. 推进任务状态:pending → in_progress
        self.task_manager.update_status(task_id, "in_progress")

        # 发射后置事件
        self._emit("worktree.create.after", task_id=task_id, path=str(wt_path))

        return str(wt_path)

    except subprocess.CalledProcessError as e:
        self._emit("worktree.create.failed", task_id=task_id, error=str(e))
        raise

一个方法做三件事:创建目录、注册元信息、推进任务状态。调用方只需要一行代码。

执行

Agent 拿到 worktree 路径后,所有工具调用的 cwd 都切到这个路径:

def run_in_worktree(self, task_id: str, command: str) -> str:
    """在任务的worktree目录中执行命令"""
    wt_path = self._get_worktree_path(task_id)
    if not wt_path:
        raise ValueError(f"No worktree for task {task_id}")

    result = subprocess.run(
        command, shell=True, cwd=wt_path,
        capture_output=True, text=True, timeout=120,
    )
    return (result.stdout + result.stderr).strip() or "(no output)"

Agent A 在 .worktrees/task-42/ 里改 config.py,Agent B 在 .worktrees/task-43/ 里改 config.py文件系统级隔离,不可能互相污染。

收尾

def worktree_remove(self, task_id: str, complete_task: bool = True) -> None:
    """拆除worktree,可选同时完成任务"""
    self._emit("worktree.remove.before", task_id=task_id, complete_task=complete_task)

    # 1. 完成任务(可选)
    if complete_task:
        self.task_manager.update_status(task_id, "completed")
        self._emit("task.completed", task_id=task_id)

    # 2. 移除 git worktree
    wt_path = self._get_worktree_path(task_id)
    subprocess.run(
        ["git", "worktree", "remove", str(wt_path), "--force"],
        cwd=self.project_root, check=True,
        capture_output=True, text=True,
    )

    # 3. 从注册表注销
    self._unregister_worktree(task_id)

    # 4. 清理分支(可选)
    branch = f"task-{task_id}"
    subprocess.run(
        ["git", "branch", "-d", branch],
        cwd=self.project_root,
        capture_output=True, text=True,
    )

    self._emit("worktree.remove.after", task_id=task_id)

worktree_remove(task_id, complete_task=True) 一个调用搞定四件事:完成任务、拆除目录、注销注册、清理分支。

如果需要保留 worktree 供调试,传 complete_task=False,worktree 状态改为 kept 而非 removed

崩溃恢复

def recover(self) -> dict:
    """从 .tasks/ + index.json 重建现场"""
    issues = {"orphaned_worktrees": [], "incomplete_ops": [], "recovered": []}

    # 1. 扫描事件流,找未闭合的 before 事件
    pending_ops = self._find_incomplete_ops()
    for op in pending_ops:
        if op["type"] == "worktree.create.before":
            # 创建到一半崩溃了 → 清理残留目录
            self._cleanup_partial_create(op["task_id"])
            issues["incomplete_ops"].append(op)

    # 2. 对比 index.json 和磁盘实际状态
    for wt_id, wt_info in self._load_index().items():
        wt_path = Path(wt_info["path"])
        if not wt_path.exists():
            # 注册了但目录不存在 → 孤儿记录
            self._unregister_worktree(wt_id)
            issues["orphaned_worktrees"].append(wt_id)
        else:
            issues["recovered"].append(wt_id)

    # 3. 对比磁盘和 index.json
    for dir_path in self.worktrees_dir.iterdir():
        if dir_path.is_dir() and dir_path.name not in self._load_index():
            # 目录存在但没注册 → 孤儿目录
            issues["orphaned_worktrees"].append(dir_path.name)

    return issues

恢复逻辑的核心思路:两个数据源交叉比对

  • index.json 有但磁盘没有 → 注册表残留,清理
  • 磁盘有但 index.json 没有 → 孤儿目录,标记
  • 事件流有 before 但没 after → 半完成操作,回滚

这就是为什么需要事件流——没有 before/after 事件对,你无法区分"正常运行中"和"崩溃到一半"。

双平面架构:控制面与执行面分离

完整代码

"""s12 — Worktree + Task Isolation: 终极隔离"""

import json
import subprocess
import time
from datetime import datetime, timezone
from pathlib import Path


class WorktreeManager:
    """管理 git worktree 生命周期,与 TaskManager 联动。"""

    def __init__(self, project_root: str, task_manager):
        self.project_root = Path(project_root)
        self.worktrees_dir = self.project_root / ".worktrees"
        self.tasks_dir = self.project_root / ".tasks"
        self.task_manager = task_manager

        self.worktrees_dir.mkdir(exist_ok=True)
        self.tasks_dir.mkdir(exist_ok=True)

    # ── 注册表 ──────────────────────────────────────

    def _index_path(self) -> Path:
        return self.worktrees_dir / "index.json"

    def _load_index(self) -> dict:
        path = self._index_path()
        if path.exists():
            return json.loads(path.read_text())["worktrees"]
        return {}

    def _save_index(self, worktrees: dict) -> None:
        self._index_path().write_text(
            json.dumps({"worktrees": worktrees}, indent=2, ensure_ascii=False)
        )

    def _register_worktree(self, task_id: str, branch: str, path: str) -> None:
        index = self._load_index()
        index[f"task-{task_id}"] = {
            "task_id": task_id,
            "branch": branch,
            "path": path,
            "status": "active",
            "created_at": datetime.now(timezone.utc).isoformat(),
        }
        self._save_index(index)

    def _unregister_worktree(self, task_id: str) -> None:
        index = self._load_index()
        key = f"task-{task_id}"
        if key in index:
            del index[key]
        self._save_index(index)

    def _get_worktree_path(self, task_id: str) -> Path | None:
        index = self._load_index()
        info = index.get(f"task-{task_id}")
        return Path(info["path"]) if info else None

    # ── 事件流 ──────────────────────────────────────

    def _events_path(self) -> Path:
        return self.tasks_dir / "events.jsonl"

    def _emit(self, event_type: str, **data) -> None:
        event = {
            "ts": datetime.now(timezone.utc).isoformat(),
            "type": event_type,
            **data,
        }
        with open(self._events_path(), "a") as f:
            f.write(json.dumps(event, ensure_ascii=False) + "\n")

    # ── Worktree 生命周期 ───────────────────────────

    def worktree_create(self, task_id: str, branch: str | None = None) -> str:
        branch = branch or f"task-{task_id}"
        wt_path = self.worktrees_dir / f"task-{task_id}"

        self._emit("worktree.create.before", task_id=task_id, branch=branch)

        try:
            subprocess.run(
                ["git", "worktree", "add", str(wt_path), "-b", branch],
                cwd=self.project_root,
                check=True,
                capture_output=True,
                text=True,
            )
            self._register_worktree(task_id, branch, str(wt_path))
            self.task_manager.update_status(task_id, "in_progress")
            self._emit("worktree.create.after", task_id=task_id, path=str(wt_path))
            return str(wt_path)

        except subprocess.CalledProcessError as e:
            self._emit(
                "worktree.create.failed", task_id=task_id, error=str(e.stderr)
            )
            raise

    def worktree_remove(self, task_id: str, complete_task: bool = True) -> None:
        self._emit(
            "worktree.remove.before", task_id=task_id, complete_task=complete_task
        )

        if complete_task:
            self.task_manager.update_status(task_id, "completed")
            self._emit("task.completed", task_id=task_id)

        wt_path = self._get_worktree_path(task_id)
        if wt_path and wt_path.exists():
            subprocess.run(
                ["git", "worktree", "remove", str(wt_path), "--force"],
                cwd=self.project_root,
                check=True,
                capture_output=True,
                text=True,
            )

        branch = f"task-{task_id}"
        subprocess.run(
            ["git", "branch", "-d", branch],
            cwd=self.project_root,
            capture_output=True,
            text=True,
        )

        self._unregister_worktree(task_id)
        self._emit("worktree.remove.after", task_id=task_id)

    # ── 执行 ────────────────────────────────────────

    def run_in_worktree(self, task_id: str, command: str) -> str:
        wt_path = self._get_worktree_path(task_id)
        if not wt_path or not wt_path.exists():
            raise ValueError(f"No active worktree for task {task_id}")

        result = subprocess.run(
            command,
            shell=True,
            cwd=wt_path,
            capture_output=True,
            text=True,
            timeout=120,
        )
        out = (result.stdout + result.stderr).strip()
        return out[:50000] if out else "(no output)"

    # ── 崩溃恢复 ────────────────────────────────────

    def _find_incomplete_ops(self) -> list:
        events_path = self._events_path()
        if not events_path.exists():
            return []

        before_events = {}
        for line in events_path.read_text().strip().split("\n"):
            if not line:
                continue
            event = json.loads(line)
            etype = event["type"]
            tid = event.get("task_id", "")

            if etype.endswith(".before"):
                before_events[f"{etype}:{tid}"] = event
            elif etype.endswith(".after") or etype.endswith(".failed"):
                base = etype.rsplit(".", 1)[0]
                before_events.pop(f"{base}.before:{tid}", None)

        return list(before_events.values())

    def recover(self) -> dict:
        issues = {"orphaned_worktrees": [], "incomplete_ops": [], "recovered": []}

        pending_ops = self._find_incomplete_ops()
        for op in pending_ops:
            issues["incomplete_ops"].append(op)
            if op["type"] == "worktree.create.before":
                tid = op["task_id"]
                partial_path = self.worktrees_dir / f"task-{tid}"
                if partial_path.exists():
                    subprocess.run(
                        ["git", "worktree", "remove", str(partial_path), "--force"],
                        cwd=self.project_root,
                        capture_output=True,
                        text=True,
                    )
                self._unregister_worktree(tid)

        index = self._load_index()
        for wt_id, wt_info in list(index.items()):
            wt_path = Path(wt_info["path"])
            if not wt_path.exists():
                self._unregister_worktree(wt_info["task_id"])
                issues["orphaned_worktrees"].append(wt_id)
            else:
                issues["recovered"].append(wt_id)

        for dir_path in self.worktrees_dir.iterdir():
            if dir_path.is_dir() and dir_path.name not in index:
                issues["orphaned_worktrees"].append(dir_path.name)

        return issues

    # ── 查询 ────────────────────────────────────────

    def list_active(self) -> list:
        return [
            info
            for info in self._load_index().values()
            if info["status"] == "active"
        ]

代码量约 180 行。加上已有的 TaskManager(~100 行),整个任务+隔离系统不到 300 行。

12 课回顾:从 20 行到完整 Harness

这是终极课。让我们回头看看整个旅程:

课程机制核心概念
s01Agent Loopwhile 循环 + stop_reason
s02Tool Usedispatch map,注册即扩展
s03TodoWrite显式规划,扁平待办清单
s04Subagent隔离上下文,独立消息列表
s05Skill Loading动态 system prompt 注入
s06Context Compact压缩旧消息,保持窗口可用
s07Task System持久化 DAG,依赖解锁
s08Background Tasks守护线程 + 通知队列
s09Agent Teams多 Agent 接力协作
s10Team Protocols握手协议 + 共享黑板
s11Autonomous Loop自治循环 + 终止条件
s12Worktree Isolation双平面架构 + 双状态机

12 个课程,12 个递进的机制。但有一个东西从头到尾没变过:

while True:
    response = client.messages.create(...)
    messages.append({"role": "assistant", "content": response.content})
    if response.stop_reason != "tool_use":
        return
    # 执行工具,收集结果
    messages.append({"role": "user", "content": results})

这个循环,从第 1 课到第 12 课,一行不改。

所有 12 个机制——工具注册、规划、子 Agent、技能、压缩、持久化、后台、团队、协议、自治、隔离——全部都在循环之外叠加。循环只管三件事:调 LLM、判退出、执行工具。

这就是 Harness 工程的核心理念:循环不变,Harness 叠加。

洞见

控制面与执行面分离

这个架构模式在分布式系统中无处不在:

  • Kubernetes:etcd(控制面)管状态,Node(执行面)跑容器
  • 数据库:WAL/事务日志(控制面)管一致性,数据页(执行面)存数据
  • 本课.tasks/(控制面)管调度,.worktrees/(执行面)管执行

分离的好处是独立故障恢复。执行面崩了(worktree 目录损坏),控制面还在(tasks.json 完好),可以重建。控制面更新了(任务完成),执行面可以延迟清理(worktree 保留供调试)。

事件流的可扩展性

events.jsonl 看起来只是个日志文件,但它是整个系统最有扩展价值的组件。

当前用途:崩溃恢复。

未来可以做的事,不改一行核心代码:

  • 监控仪表盘:消费事件流,实时展示任务进度
  • 性能分析:统计每个任务的 worktree 存活时长
  • Webhook 通知:任务完成时推送到 Slack
  • 审计日志:合规场景下追溯每个操作

这就是事件溯源模式的威力:把"发生了什么"完整记录,让"怎么用这些信息"成为后续扩展的事。

双状态机的优雅

Task 和 Worktree 各有自己的生命周期,但通过绑定关系联动。这比单一状态机优雅得多——你不需要一个巨大的状态枚举来覆盖所有组合(task_pending_worktree_absenttask_in_progress_worktree_active……),而是两个小状态机各管各的,联动点只有两个:创建和收尾。

五分钟跑起来

# 进入项目目录
cd learn-claude-code

# 启动终极课
python agents/s12_worktree_task_isolation.py

任务一:创建任务 + 绑定 Worktree

先创建任务,再为每个任务创建隔离的工作空间:

s12 >> Create tasks for backend auth and frontend login page, then list tasks.

实际执行记录:

> task_create: {"id": 6, "subject": "Backend Auth", "status": "pending"}
> task_create: {"id": 7, "subject": "Frontend Login Page", "status": "pending"}

> task_list:
  [x] #1-#5: 前几课的任务(已完成)
  [ ] #6: Backend Auth
  [ ] #7: Frontend Login Page

然后为任务绑定独立的 worktree:

s12 >> Create worktree "auth-refactor" for task 1,
       then bind task 2 to a new worktree "ui-login".
> worktree_create:
  name: auth-refactor
  path: .worktrees/auth-refactor
  branch: wt/auth-refactor         ← 自动创建 git 分支
  task_id: 1
  status: active

> worktree_create:
  name: ui-login
  path: .worktrees/ui-login
  branch: wt/ui-login
  task_id: 2
  status: active

关键观察: 每个 worktree 自动创建独立 git 分支(wt/auth-refactorwt/ui-login),绑定到具体 task_id。两个任务在不同目录操作同一份代码库的副本——物理级隔离,互不干扰。

任务二:在隔离环境中执行命令

在指定 worktree 里运行命令,验证隔离效果:

s12 >> Run "git status --short" in worktree "auth-refactor".
> worktree_run: (no output)    ← 干净的工作区,没有任何修改

工作区是全新的——从 HEAD 检出的独立副本。在这里做的任何修改都不会影响主目录和其他 worktree。

任务三:Worktree 生命周期管理

标记 worktree 为保留状态,查看完整事件流:

s12 >> Keep worktree "ui-login", then list worktrees and inspect events.

实际执行记录:

> worktree_keep: {"name": "ui-login", "status": "kept"}

> worktree_list:
  [active]  auth-refactor → .worktrees/auth-refactor (wt/auth-refactor) task=1
  [kept]    ui-login      → .worktrees/ui-login      (wt/ui-login)     task=2

> worktree_events:
  1. worktree.create.before  — auth-refactor 创建开始
  2. worktree.create.after   — auth-refactor 创建完成
  3. worktree.create.before  — ui-login 创建开始
  4. worktree.create.after   — ui-login 创建完成
  5. worktree.keep           — ui-login 标记为保留 📌

events.jsonl 记录了完整的生命周期——每个 worktree 的创建、保留、删除都有时间戳和上下文。崩溃恢复时可以从事件流重建状态。

任务四:删除 Worktree + 自动完成任务

删除 worktree 并自动标记关联任务为完成:

s12 >> Remove worktree "auth-refactor" with complete_task=true,
       then list tasks/worktrees/events.

实际执行记录:

> worktree_remove: Removed worktree 'auth-refactor'

> task_list:
  [x] #1: Design the data schema owner=frank        ← 自动标记完成!
  [x] #2: Build the backend API layer owner=eve wt=ui-login

> worktree_list:
  [removed] auth-refactor (wt/auth-refactor) task=1
  [kept]    ui-login      (wt/ui-login)      task=2

> worktree_events:
  6. worktree.remove.before  — auth-refactor 删除开始
  7. task.completed          — Task #1 自动标记完成 ✅
  8. worktree.remove.after   — auth-refactor 完全清除 🗑️

一个 worktree_remove 触发了完整的收尾链: 拆除工作目录 → 标记任务完成 → 记录事件。这就是 Worktree 和 Task 双状态机联动的价值——不需要手动维护两边的一致性。

完整变更表(终极版)

组件之前(s11)之后(s12)
工作目录所有任务共享一个每个任务独立 worktree
任务状态仅 Task 状态机Task + Worktree 双状态机
文件隔离git worktree 级隔离
元信息管理tasks.jsontasks.json + index.json
生命周期日志隐式/无events.jsonl 显式事件流
崩溃恢复从 tasks.json 恢复tasks.json + index.json + events 交叉恢复
创建流程创建任务创建任务 + 创建 worktree + 绑定
收尾流程标记完成标记完成 + 拆除 worktree + 注销 + 清理分支
新增代码~180 行(WorktreeManager)
累计代码~650 行~830 行

系列总结:从 20 行到 830 行

12 课走完,让我们看看全局:

第一阶段(s01-s03):最小 Agent

  • while 循环 + 工具 + 规划
  • 一个 Agent 能干活了

第二阶段(s04-s06):能力扩展

  • 子 Agent + 技能 + 压缩
  • Agent 能应对复杂任务了

第三阶段(s07-s08):持久化与并发

  • 任务图 + 后台执行
  • Agent 能断点续跑、不阻塞了

第四阶段(s09-s12):多 Agent 系统

  • 团队 + 协议 + 自治 + 隔离
  • Agent 能组队、自治、互不干扰了

从 20 行到 830 行,代码量增加了约 40 倍。但核心循环——那个 while True 加 stop_reason 判断——一行不改

所有 830 行代码,都在做同一件事:让那 20 行循环跑得更好、更安全、更强大。

这就是 Agent Harness 工程的本质。不是发明新的 AI 能力,而是给已有的 AI 能力搭建基础设施——工具、规划、隔离、协作、持久化、恢复。循环是发动机,Harness 是整辆车。

12课全景:从while循环到完整Harness


这是《12课拆解Claude Code架构:从零掌握Agent Harness工程》系列的第 12 课,也是终极课。感谢你一路读到这里。

完整代码和交互式学习平台:github.com/shareAI-lab/learn-claude-code

如果这个系列对你有帮助,欢迎转发给你的技术团队。

系列目录

  • 第1课:用20行Python造出你的第一个AI Agent
  • 第2课:给Agent加工具 —— dispatch map模式详解
  • 第3课:TodoWrite —— 让Agent先想后做:规划系统
  • 第4课:Subagent —— 拆解大任务,上下文隔离
  • 第5课:按需加载领域知识——Skill机制
  • 第6课:无限对话——上下文压缩三层策略
  • 第7课:任务持久化——文件级DAG任务图
  • 第8课:后台执行——异步任务与通知队列
  • 第9课:Agent Teams——多Agent协作:团队与邮箱系统
  • 第10课:团队协议——状态机驱动的协商
  • 第11课:自治Agent——自组织任务认领
  • 第12课:终极隔离——Worktree并行执行(本文)

On this page