HuanCode Docs

第7课:任务持久化——文件级DAG任务图

s03的TodoManager只是内存扁平清单——没有顺序、没有依赖、压缩后就没了。TaskManager用文件级DAG把任务图写入磁盘,支持依赖解锁和三状态流转,成为后续所有机制的协调骨架。

系列导读

这是**《12课拆解Claude Code架构》**系列的第 7 课。

前六课我们造了一个有完整工具链、规划系统、子 Agent、技能加载和上下文压缩的 Agent。从这一课开始,我们进入阶段三:持久化与并发

第 7 课的格言:

"大目标要拆成小任务,排好序,记在磁盘上"

你会发现,第 3 课的 TodoManager 在简单场景下够用,但面对真实的复杂目标——有依赖、有并行、需要跨会话恢复——它完全撑不住。这一课,我们给 Agent 装上一个持久化的任务图(DAG)


扁平清单的三个致命伤

回顾第 3 课的 TodoManager:

self.todos = [
    {"content": "创建数据模型", "status": "completed"},
    {"content": "写注册接口",   "status": "in_progress"},
    {"content": "写登录接口",   "status": "pending"},
    {"content": "写单元测试",   "status": "pending"},
]

它解决了"没有计划"的问题,但留下了三个致命伤:

致命伤一:没有依赖关系。 "写单元测试"应该等"注册接口"和"登录接口"都完成后再做,但扁平清单表达不了这种约束。模型可能在接口还没写完时就跳去写测试。

致命伤二:没有并行语义。 "写注册接口"和"写登录接口"其实互不依赖,可以同时推进(尤其在多 Agent 场景下)。但 TodoManager 的 WIP=1 约束强制了线性执行。

致命伤三:活在内存里。 一旦上下文被压缩(第 6 课),或者会话结束,整个待办列表就消失了。下次启动,Agent 不知道做到哪了。

扁平清单的世界:
  ☐ A
  ▶ B
  ☐ C
  ☐ D
  ☐ E
  → 哪个先做?C和D能并行吗?E要等谁?全靠猜。

任务图的世界:
  A ──→ B ──→ E

  C ──→ D
  → B依赖A,D依赖C,E依赖B和D。一目了然。

扁平清单 vs DAG任务图


任务图架构:文件即数据库

解决方案是把每个任务存成一个独立的 JSON 文件:

.tasks/
├── task_1.json    {"id":1, "subject":"创建数据模型",  "status":"completed",   "blockedBy":[], "owner":null}
├── task_2.json    {"id":2, "subject":"写注册接口",    "status":"in_progress", "blockedBy":[1], "owner":"main"}
├── task_3.json    {"id":3, "subject":"写登录接口",    "status":"in_progress", "blockedBy":[1], "owner":"agent-2"}
├── task_4.json    {"id":4, "subject":"写单元测试",    "status":"pending",     "blockedBy":[2,3], "owner":null}
└── task_5.json    {"id":5, "subject":"部署到测试环境", "status":"pending",     "blockedBy":[4], "owner":null}

整个结构用 ASCII 画出来就是一个 DAG(有向无环图):

task_1 (completed)
  ├──→ task_2 (in_progress, owner=main)
  │        ↘
  └──→ task_3 (in_progress, owner=agent-2)

           task_4 (pending, blocked by 2,3)

           task_5 (pending, blocked by 4)

每个任务文件有五个核心字段:

字段类型含义
idint唯一标识
subjectstring任务描述
statusenumpending / in_progress / completed
blockedByint[]依赖的任务 ID 列表
ownerstring|null哪个 Agent 在做这个任务

任务图回答三个关键问题:

  1. 什么可以做? —— status=pending 且 blockedBy 里的任务全部 completed
  2. 什么被卡住? —— status=pending 且 blockedBy 还有未完成的
  3. 什么做完了? —— status=completed

核心拆解:TaskManager

机制一:文件级持久化

import json
import os

TASK_DIR = ".tasks"

class TaskManager:
    def __init__(self, task_dir: str = TASK_DIR):
        self.task_dir = task_dir
        os.makedirs(task_dir, exist_ok=True)
        self._next_id = self._compute_next_id()

    def _task_path(self, task_id: int) -> str:
        return os.path.join(self.task_dir, f"task_{task_id}.json")

    def _read_task(self, task_id: int) -> dict | None:
        path = self._task_path(task_id)
        if not os.path.exists(path):
            return None
        with open(path, "r") as f:
            return json.load(f)

    def _write_task(self, task: dict):
        path = self._task_path(task["id"])
        with open(path, "w") as f:
            json.dump(task, f, indent=2, ensure_ascii=False)

    def _compute_next_id(self) -> int:
        existing = self._all_task_ids()
        return max(existing, default=0) + 1

    def _all_task_ids(self) -> list[int]:
        ids = []
        for fname in os.listdir(self.task_dir):
            if fname.startswith("task_") and fname.endswith(".json"):
                ids.append(int(fname[5:-5]))
        return sorted(ids)

每个操作直接读写磁盘文件。没有内存缓存,没有数据库连接。文件系统就是数据库。

机制二:创建任务(task_create)

def create(self, subject: str, blocked_by: list[int] | None = None) -> dict:
    """创建一个新任务,可选指定依赖。"""
    task = {
        "id": self._next_id,
        "subject": subject,
        "status": "pending",
        "blockedBy": blocked_by or [],
        "owner": None,
    }
    # 验证依赖的任务确实存在
    for dep_id in task["blockedBy"]:
        if self._read_task(dep_id) is None:
            return {"error": f"Dependency task {dep_id} does not exist"}

    self._write_task(task)
    self._next_id += 1
    return task

创建时只做一件关键校验:blockedBy 引用的任务必须存在。防止模型创建出指向不存在任务的依赖边,导致任务永远被卡住。

机制三:状态变更 + 依赖解锁(task_update)

def update(self, task_id: int, status: str, owner: str | None = None) -> dict:
    """更新任务状态。当任务完成时自动解锁后续任务。"""
    task = self._read_task(task_id)
    if task is None:
        return {"error": f"Task {task_id} not found"}

    # 不能把被阻塞的任务直接设为 in_progress
    if status == "in_progress" and task["blockedBy"]:
        unfinished = [
            dep_id for dep_id in task["blockedBy"]
            if self._read_task(dep_id)["status"] != "completed"
        ]
        if unfinished:
            return {"error": f"Task {task_id} is blocked by unfinished tasks: {unfinished}"}

    task["status"] = status
    task["owner"] = owner
    self._write_task(task)

    # ★ 核心:完成时自动清除下游任务的依赖
    if status == "completed":
        self._clear_dependency(task_id)

    return task

def _clear_dependency(self, completed_id: int):
    """从所有下游任务的 blockedBy 中移除已完成的任务 ID。"""
    for tid in self._all_task_ids():
        downstream = self._read_task(tid)
        if completed_id in downstream["blockedBy"]:
            downstream["blockedBy"].remove(completed_id)
            self._write_task(downstream)

_clear_dependency 是整个系统的精华。 当 task_2 完成时,它扫描所有下游任务,从它们的 blockedBy 列表中移除 2。如果 task_4 的 blockedBy 原本是 [2, 3],现在变成 [3]——还被 task_3 卡着。等 task_3 也完成,blockedBy 变成 [],task_4 就自动变成可执行状态。

这是一个被动解锁机制——不需要调度器,不需要事件系统,只需要在完成时扫一遍文件。简单、可靠、易调试。

机制四:列表和查询(task_list / task_get)

def list_tasks(self, status_filter: str | None = None) -> list[dict]:
    """列出所有任务,可按状态过滤。"""
    tasks = []
    for tid in self._all_task_ids():
        task = self._read_task(tid)
        if status_filter is None or task["status"] == status_filter:
            tasks.append(task)
    return tasks

def get_task(self, task_id: int) -> dict:
    """获取单个任务的详情。"""
    task = self._read_task(task_id)
    if task is None:
        return {"error": f"Task {task_id} not found"}
    return task

def get_actionable(self) -> list[dict]:
    """获取所有可以立即执行的任务(pending + blockedBy 为空)。"""
    return [
        t for t in self.list_tasks()
        if t["status"] == "pending" and not t["blockedBy"]
    ]

def render(self) -> str:
    """渲染任务图全景,供模型快速了解全局状态。"""
    tasks = self.list_tasks()
    if not tasks:
        return "(no tasks)"
    icons = {"pending": "☐", "in_progress": "▶", "completed": "✓"}
    lines = []
    for t in tasks:
        icon = icons.get(t["status"], "?")
        blocked = f" [blocked by {t['blockedBy']}]" if t["blockedBy"] else ""
        owner = f" ({t['owner']})" if t["owner"] else ""
        lines.append(f"  {icon} #{t['id']} {t['subject']}{blocked}{owner}")
    return "\n".join(lines)

get_actionable() 是给调度层用的——后面第 8 课的后台执行和第 9 课的多 Agent 团队,都靠这个方法获取"下一步该做什么"。


四个工具:注册到 dispatch map

和之前的模式完全一致——新工具只需注册,循环不碰:

task_manager = TaskManager()

TASK_TOOLS = [
    {
        "name": "task_create",
        "description": "Create a new task. Optionally specify blockedBy (list of task IDs) for dependencies.",
        "input_schema": {
            "type": "object",
            "properties": {
                "subject": {"type": "string", "description": "What the task is about"},
                "blocked_by": {
                    "type": "array", "items": {"type": "integer"},
                    "description": "IDs of tasks that must complete before this one",
                },
            },
            "required": ["subject"],
        },
    },
    {
        "name": "task_update",
        "description": "Update a task's status. Set to in_progress when starting, completed when done.",
        "input_schema": {
            "type": "object",
            "properties": {
                "task_id": {"type": "integer"},
                "status": {"type": "string", "enum": ["pending", "in_progress", "completed"]},
                "owner": {"type": "string", "description": "Which agent owns this task"},
            },
            "required": ["task_id", "status"],
        },
    },
    {
        "name": "task_list",
        "description": "List all tasks. Optionally filter by status.",
        "input_schema": {
            "type": "object",
            "properties": {
                "status_filter": {
                    "type": "string",
                    "enum": ["pending", "in_progress", "completed"],
                },
            },
        },
    },
    {
        "name": "task_get",
        "description": "Get details of a specific task by ID.",
        "input_schema": {
            "type": "object",
            "properties": {"task_id": {"type": "integer"}},
            "required": ["task_id"],
        },
    },
]

# 注册到 dispatch map
TOOL_HANDLERS = {
    "bash":         run_bash,
    "read":         read_file,
    "write":        write_file,
    "edit":         edit_file,
    "todo":         lambda items: todo_manager.update(items),
    "task":         lambda prompt: run_subagent(prompt),
    "task_create":  lambda **kw: json.dumps(task_manager.create(**kw), ensure_ascii=False),
    "task_update":  lambda **kw: json.dumps(task_manager.update(**kw), ensure_ascii=False),
    "task_list":    lambda **kw: json.dumps(task_manager.list_tasks(**kw), ensure_ascii=False),
    "task_get":     lambda **kw: json.dumps(task_manager.get_task(**kw), ensure_ascii=False),
}

从 6 个工具变成 10 个。Agent Loop 一行不改


实际运行:任务图怎么转

给 Agent 一个复合任务:

"搭建一个博客系统:数据模型、CRUD 接口、单元测试、部署脚本"

Agent 的执行链路:

第1轮: [task_create] "创建数据模型"                     → task_1
第2轮: [task_create] "写 CRUD 接口" blocked_by=[1]      → task_2
第3轮: [task_create] "写单元测试"   blocked_by=[2]      → task_3
第4轮: [task_create] "写部署脚本"   blocked_by=[]       → task_4
第5轮: [task_create] "集成验证"     blocked_by=[3,4]    → task_5

任务图建好了:
  ✓ #1 创建数据模型
  ▶ #2 写 CRUD 接口 [blocked by [1]]
  ☐ #3 写单元测试   [blocked by [2]]
  ☐ #4 写部署脚本                     ← 和 2,3 并行!
  ☐ #5 集成验证     [blocked by [3,4]]

第6轮:  [task_update] #1 → completed
        → _clear_dependency: task_2 的 blockedBy 从 [1] 变成 []
第7轮:  [task_update] #2 → in_progress
第8轮:  [bash] 创建 CRUD 接口文件...
第9轮:  [task_update] #4 → in_progress   ← 和 #2 并行推进!
第10轮: [bash] 创建部署脚本...
第11轮: [task_update] #2 → completed
        → _clear_dependency: task_3 的 blockedBy 从 [2] 变成 []
第12轮: [task_update] #4 → completed
        → _clear_dependency: task_5 的 blockedBy 从 [3,4] 变成 [3]
第13轮: [task_update] #3 → in_progress
第14轮: [bash] 写测试...
第15轮: [task_update] #3 → completed
        → _clear_dependency: task_5 的 blockedBy 从 [3] 变成 []
第16轮: [task_update] #5 → in_progress
第17轮: [bash] 运行集成验证...
第18轮: [task_update] #5 → completed

注意第 9 轮:task_4(部署脚本)不依赖 task_2(CRUD 接口),所以 Agent 可以交叉推进。这在扁平清单模式下不可能——WIP=1 约束了线性执行。

任务图的依赖解锁流程


洞见:两个关键设计决策

为什么用文件而不是数据库

SQLite 更快,PostgreSQL 更强——但这里刻意选择了文件系统。原因有三:

一、零依赖。 不需要安装任何数据库。任何有文件系统的环境都能跑。

二、可读性。 cat .tasks/task_3.json 就能看到任务状态。出了问题直接用文本编辑器改。调试成本几乎为零。

三、Git 友好。 .tasks/ 目录可以提交到 Git。任务图的变更历史自动被版本控制追踪。你能看到"谁在什么时候把 task_3 从 pending 改成 completed"。

在 Claude Code 的实际实现中,任务文件存储在工作目录的 .tasks/ 下。这不是偶然——它让任务图和代码库绑定在一起,而不是飘在某个全局数据库里。

这个 DAG 是后续所有机制的协调骨架

从这一课开始,任务图成为整个 Agent 系统的中央协调结构

后续课程如何使用任务图
第 8 课:后台执行后台 Agent 从任务图取 actionable 任务,完成后自动解锁下游
第 9 课:多 Agent 团队不同 Agent 通过 owner 字段认领任务,避免重复
第 10-11 课:协作机制Agent 之间通过任务图的状态变化传递进度信号
第 12 课:Worktree 隔离每个 worktree 绑定一个任务 ID,隔离执行互不干扰

没有这个任务图,后面的课程全部无法实现。 它是从"单 Agent 串行"到"多 Agent 并发"的转折点。


五分钟跑起来

# 进入项目目录
cd learn-claude-code

# 启动第七课
python agents/s07_task_system.py

任务一:创建带依赖的任务链

先让 Agent 创建三个任务,并设置顺序依赖:

s07 >> Create 3 tasks: "Setup project", "Write code", "Write tests".
       Make them depend on each other in order.

实际执行记录:

> task_create: {"id": 1, "subject": "Setup project", "status": "pending", "blockedBy": []}
> task_create: {"id": 2, "subject": "Write code", "status": "pending", "blockedBy": []}
> task_create: {"id": 3, "subject": "Write tests", "status": "pending", "blockedBy": []}
> task_update: {"id": 2, "blockedBy": [1]}    ← 设置依赖:Write code 等 Setup project
> task_update: {"id": 3, "blockedBy": [2]}    ← 设置依赖:Write tests 等 Write code

注意 Agent 的策略:先创建全部任务,再逐个设置依赖关系。最终依赖链:Setup project → Write code → Write tests

任务二:查看依赖图全景

退出后重新启动,验证持久化效果——任务还在磁盘上:

s07 >> List all tasks and show the dependency graph

实际执行记录:

> task_list:
  [ ] #1: Setup project
  [ ] #2: Write code (blocked by: [1])
  [ ] #3: Write tests (blocked by: [2])

> task_get: #1 → blockedBy: []
> task_get: #2 → blockedBy: [1]
> task_get: #3 → blockedBy: [2]

Agent 用 task_list 获取全景,再逐个 task_get 获取详情,然后渲染了一张依赖图:

#1 Setup project       (🟢 ready to start)
 └──▶ #2 Write code    (🔒 blocked by #1)
       └──▶ #3 Write tests   (🔒 blocked by #2)

关键验证: 重启后任务图完整恢复。这是扁平清单做不到的。

任务三:完成任务,观察自动解锁

完成 task_1,观察 task_2 从阻塞变为可执行:

s07 >> Complete task 1 and then list tasks to see task 2 unblocked

实际执行记录:

> task_list:
  [ ] #1: Setup project
  [ ] #2: Write code (blocked by: [1])
  [ ] #3: Write tests (blocked by: [2])

> task_update: {"id": 1, "status": "completed"}    ← 完成 task_1
  → _clear_dependency 触发:task_2 的 blockedBy 从 [1] 变成 []

> task_list:
  [x] #1: Setup project
  [ ] #2: Write code                    ← 解锁了!blockedBy 已清空
  [ ] #3: Write tests (blocked by: [2])

_clear_dependency 精确生效——task_1 完成后,task_2 自动解锁,task_3 仍被 task_2 阻塞。

任务四:创建带并行分支的 DAG

更复杂的场景——有并行分支和汇聚节点:

s07 >> Create a task board for refactoring: parse -> transform -> emit -> test,
       where transform and emit can run in parallel after parse

实际执行记录:

> task_create: {"id": 4, "subject": "Parse"}
> task_create: {"id": 5, "subject": "Transform"}
> task_create: {"id": 6, "subject": "Emit"}
> task_create: {"id": 7, "subject": "Test"}
> task_update: {"id": 5, "blockedBy": [4]}        ← Transform 等 Parse
> task_update: {"id": 6, "blockedBy": [4]}        ← Emit 也等 Parse(并行分支)
> task_update: {"id": 7, "blockedBy": [5, 6]}     ← Test 等 Transform 和 Emit 都完成

> task_list:
  [x] #1: Setup project
  [ ] #2: Write code
  [ ] #3: Write tests (blocked by: [2])
  [ ] #4: Parse
  [ ] #5: Transform (blocked by: [4])
  [ ] #6: Emit (blocked by: [4])
  [ ] #7: Test (blocked by: [5, 6])

Agent 渲染出来的 DAG:

        ┌──────────────┐
        │  #4: Parse    │
        └──────┬───────┘

       ┌───────┴───────┐
       ▼               ▼
┌──────────────┐ ┌──────────────┐
│ #5: Transform│ │  #6: Emit    │  ← 并行分支
└──────┬───────┘ └──────┬───────┘
       │               │
       └───────┬───────┘

        ┌──────────────┐
        │  #7: Test     │  ← 汇聚节点
        └──────────────┘

这就是 DAG 相比扁平清单的核心优势——Transform 和 Emit 可以并行,Test 必须等两者都完成。blockedBy: [5, 6] 天然表达了这种汇聚语义。

磁盘上的任务文件


变更总结

组件s03/s06(之前)s07(本课)
任务存储内存列表磁盘 JSON 文件(.tasks/)
依赖关系blockedBy[]
状态模型pending/in_progress/completed相同三状态,但加了依赖约束
并行语义WIP=1 强制串行无依赖的任务可并行
持久化内存丢失即消失文件级持久化,跨会话存活
任务归属owner 字段标记执行者
依赖解锁_clear_dependency 自动解锁下游
工具数610(+task_create/update/list/get)
Agent Loop不变不变

核心变更:从扁平清单到带依赖的任务图,从内存到磁盘,从单人串行到多人可并行。Agent Loop 从第 1 课到第 7 课,依然一行没改。


下一课预告

第 7 课让 Agent 有了持久化的任务图,但所有任务还是在前台串行执行——Agent 做一个任务时,用户只能等着。

第 8 课:Background Tasks —— 后台执行。 把耗时任务丢到后台,Agent 继续处理下一个,后台完成后自动更新任务图。

# 预告:s08 的后台执行
def run_in_background(task_id: int):
    """在独立线程中执行任务,完成后更新任务图。"""
    task = task_manager.get_task(task_id)
    thread = threading.Thread(
        target=_execute_and_complete,
        args=(task_id, task["subject"]),
    )
    thread.start()
    return f"Task #{task_id} is now running in background."

前台规划,后台执行。Agent 第一次有了"同时做多件事"的能力。


这是《12课拆解Claude Code架构:从零掌握Agent Harness工程》系列的第 7 课。关注Claw开发者,不错过后续更新。

完整代码和交互式学习平台:github.com/shareAI-lab/learn-claude-code

如果这篇文章对你有帮助,欢迎转发给你的技术团队。

系列目录

  • 第1课:用20行Python造出你的第一个AI Agent
  • 第2课:给Agent加工具 —— dispatch map模式详解
  • 第3课:TodoWrite —— 让Agent先想后做:规划系统
  • 第4课:Subagent —— 拆解大任务,上下文隔离
  • 第5课:按需加载领域知识——Skill机制
  • 第6课:无限对话——上下文压缩三层策略
  • 第7课:任务持久化——文件级DAG任务图(本文)
  • 第8课:后台执行——异步任务与通知队列
  • 第9课:Agent Teams——多Agent协作:团队与邮箱系统
  • 第10课:团队协议——状态机驱动的协商
  • 第11课:自治Agent——自组织任务认领
  • 第12课:终极隔离——Worktree并行执行

On this page