第7课:任务持久化——文件级DAG任务图
s03的TodoManager只是内存扁平清单——没有顺序、没有依赖、压缩后就没了。TaskManager用文件级DAG把任务图写入磁盘,支持依赖解锁和三状态流转,成为后续所有机制的协调骨架。
系列导读
这是**《12课拆解Claude Code架构》**系列的第 7 课。
前六课我们造了一个有完整工具链、规划系统、子 Agent、技能加载和上下文压缩的 Agent。从这一课开始,我们进入阶段三:持久化与并发。
第 7 课的格言:
"大目标要拆成小任务,排好序,记在磁盘上"
你会发现,第 3 课的 TodoManager 在简单场景下够用,但面对真实的复杂目标——有依赖、有并行、需要跨会话恢复——它完全撑不住。这一课,我们给 Agent 装上一个持久化的任务图(DAG)。
扁平清单的三个致命伤
回顾第 3 课的 TodoManager:
self.todos = [
{"content": "创建数据模型", "status": "completed"},
{"content": "写注册接口", "status": "in_progress"},
{"content": "写登录接口", "status": "pending"},
{"content": "写单元测试", "status": "pending"},
]它解决了"没有计划"的问题,但留下了三个致命伤:
致命伤一:没有依赖关系。 "写单元测试"应该等"注册接口"和"登录接口"都完成后再做,但扁平清单表达不了这种约束。模型可能在接口还没写完时就跳去写测试。
致命伤二:没有并行语义。 "写注册接口"和"写登录接口"其实互不依赖,可以同时推进(尤其在多 Agent 场景下)。但 TodoManager 的 WIP=1 约束强制了线性执行。
致命伤三:活在内存里。 一旦上下文被压缩(第 6 课),或者会话结束,整个待办列表就消失了。下次启动,Agent 不知道做到哪了。
扁平清单的世界:
☐ A
▶ B
☐ C
☐ D
☐ E
→ 哪个先做?C和D能并行吗?E要等谁?全靠猜。
任务图的世界:
A ──→ B ──→ E
↗
C ──→ D
→ B依赖A,D依赖C,E依赖B和D。一目了然。
任务图架构:文件即数据库
解决方案是把每个任务存成一个独立的 JSON 文件:
.tasks/
├── task_1.json {"id":1, "subject":"创建数据模型", "status":"completed", "blockedBy":[], "owner":null}
├── task_2.json {"id":2, "subject":"写注册接口", "status":"in_progress", "blockedBy":[1], "owner":"main"}
├── task_3.json {"id":3, "subject":"写登录接口", "status":"in_progress", "blockedBy":[1], "owner":"agent-2"}
├── task_4.json {"id":4, "subject":"写单元测试", "status":"pending", "blockedBy":[2,3], "owner":null}
└── task_5.json {"id":5, "subject":"部署到测试环境", "status":"pending", "blockedBy":[4], "owner":null}整个结构用 ASCII 画出来就是一个 DAG(有向无环图):
task_1 (completed)
├──→ task_2 (in_progress, owner=main)
│ ↘
└──→ task_3 (in_progress, owner=agent-2)
↘
task_4 (pending, blocked by 2,3)
↘
task_5 (pending, blocked by 4)每个任务文件有五个核心字段:
| 字段 | 类型 | 含义 |
|---|---|---|
id | int | 唯一标识 |
subject | string | 任务描述 |
status | enum | pending / in_progress / completed |
blockedBy | int[] | 依赖的任务 ID 列表 |
owner | string|null | 哪个 Agent 在做这个任务 |
任务图回答三个关键问题:
- 什么可以做? —— status=pending 且 blockedBy 里的任务全部 completed
- 什么被卡住? —— status=pending 且 blockedBy 还有未完成的
- 什么做完了? —— status=completed
核心拆解:TaskManager
机制一:文件级持久化
import json
import os
TASK_DIR = ".tasks"
class TaskManager:
def __init__(self, task_dir: str = TASK_DIR):
self.task_dir = task_dir
os.makedirs(task_dir, exist_ok=True)
self._next_id = self._compute_next_id()
def _task_path(self, task_id: int) -> str:
return os.path.join(self.task_dir, f"task_{task_id}.json")
def _read_task(self, task_id: int) -> dict | None:
path = self._task_path(task_id)
if not os.path.exists(path):
return None
with open(path, "r") as f:
return json.load(f)
def _write_task(self, task: dict):
path = self._task_path(task["id"])
with open(path, "w") as f:
json.dump(task, f, indent=2, ensure_ascii=False)
def _compute_next_id(self) -> int:
existing = self._all_task_ids()
return max(existing, default=0) + 1
def _all_task_ids(self) -> list[int]:
ids = []
for fname in os.listdir(self.task_dir):
if fname.startswith("task_") and fname.endswith(".json"):
ids.append(int(fname[5:-5]))
return sorted(ids)每个操作直接读写磁盘文件。没有内存缓存,没有数据库连接。文件系统就是数据库。
机制二:创建任务(task_create)
def create(self, subject: str, blocked_by: list[int] | None = None) -> dict:
"""创建一个新任务,可选指定依赖。"""
task = {
"id": self._next_id,
"subject": subject,
"status": "pending",
"blockedBy": blocked_by or [],
"owner": None,
}
# 验证依赖的任务确实存在
for dep_id in task["blockedBy"]:
if self._read_task(dep_id) is None:
return {"error": f"Dependency task {dep_id} does not exist"}
self._write_task(task)
self._next_id += 1
return task创建时只做一件关键校验:blockedBy 引用的任务必须存在。防止模型创建出指向不存在任务的依赖边,导致任务永远被卡住。
机制三:状态变更 + 依赖解锁(task_update)
def update(self, task_id: int, status: str, owner: str | None = None) -> dict:
"""更新任务状态。当任务完成时自动解锁后续任务。"""
task = self._read_task(task_id)
if task is None:
return {"error": f"Task {task_id} not found"}
# 不能把被阻塞的任务直接设为 in_progress
if status == "in_progress" and task["blockedBy"]:
unfinished = [
dep_id for dep_id in task["blockedBy"]
if self._read_task(dep_id)["status"] != "completed"
]
if unfinished:
return {"error": f"Task {task_id} is blocked by unfinished tasks: {unfinished}"}
task["status"] = status
task["owner"] = owner
self._write_task(task)
# ★ 核心:完成时自动清除下游任务的依赖
if status == "completed":
self._clear_dependency(task_id)
return task
def _clear_dependency(self, completed_id: int):
"""从所有下游任务的 blockedBy 中移除已完成的任务 ID。"""
for tid in self._all_task_ids():
downstream = self._read_task(tid)
if completed_id in downstream["blockedBy"]:
downstream["blockedBy"].remove(completed_id)
self._write_task(downstream)_clear_dependency 是整个系统的精华。 当 task_2 完成时,它扫描所有下游任务,从它们的 blockedBy 列表中移除 2。如果 task_4 的 blockedBy 原本是 [2, 3],现在变成 [3]——还被 task_3 卡着。等 task_3 也完成,blockedBy 变成 [],task_4 就自动变成可执行状态。
这是一个被动解锁机制——不需要调度器,不需要事件系统,只需要在完成时扫一遍文件。简单、可靠、易调试。
机制四:列表和查询(task_list / task_get)
def list_tasks(self, status_filter: str | None = None) -> list[dict]:
"""列出所有任务,可按状态过滤。"""
tasks = []
for tid in self._all_task_ids():
task = self._read_task(tid)
if status_filter is None or task["status"] == status_filter:
tasks.append(task)
return tasks
def get_task(self, task_id: int) -> dict:
"""获取单个任务的详情。"""
task = self._read_task(task_id)
if task is None:
return {"error": f"Task {task_id} not found"}
return task
def get_actionable(self) -> list[dict]:
"""获取所有可以立即执行的任务(pending + blockedBy 为空)。"""
return [
t for t in self.list_tasks()
if t["status"] == "pending" and not t["blockedBy"]
]
def render(self) -> str:
"""渲染任务图全景,供模型快速了解全局状态。"""
tasks = self.list_tasks()
if not tasks:
return "(no tasks)"
icons = {"pending": "☐", "in_progress": "▶", "completed": "✓"}
lines = []
for t in tasks:
icon = icons.get(t["status"], "?")
blocked = f" [blocked by {t['blockedBy']}]" if t["blockedBy"] else ""
owner = f" ({t['owner']})" if t["owner"] else ""
lines.append(f" {icon} #{t['id']} {t['subject']}{blocked}{owner}")
return "\n".join(lines)get_actionable() 是给调度层用的——后面第 8 课的后台执行和第 9 课的多 Agent 团队,都靠这个方法获取"下一步该做什么"。
四个工具:注册到 dispatch map
和之前的模式完全一致——新工具只需注册,循环不碰:
task_manager = TaskManager()
TASK_TOOLS = [
{
"name": "task_create",
"description": "Create a new task. Optionally specify blockedBy (list of task IDs) for dependencies.",
"input_schema": {
"type": "object",
"properties": {
"subject": {"type": "string", "description": "What the task is about"},
"blocked_by": {
"type": "array", "items": {"type": "integer"},
"description": "IDs of tasks that must complete before this one",
},
},
"required": ["subject"],
},
},
{
"name": "task_update",
"description": "Update a task's status. Set to in_progress when starting, completed when done.",
"input_schema": {
"type": "object",
"properties": {
"task_id": {"type": "integer"},
"status": {"type": "string", "enum": ["pending", "in_progress", "completed"]},
"owner": {"type": "string", "description": "Which agent owns this task"},
},
"required": ["task_id", "status"],
},
},
{
"name": "task_list",
"description": "List all tasks. Optionally filter by status.",
"input_schema": {
"type": "object",
"properties": {
"status_filter": {
"type": "string",
"enum": ["pending", "in_progress", "completed"],
},
},
},
},
{
"name": "task_get",
"description": "Get details of a specific task by ID.",
"input_schema": {
"type": "object",
"properties": {"task_id": {"type": "integer"}},
"required": ["task_id"],
},
},
]
# 注册到 dispatch map
TOOL_HANDLERS = {
"bash": run_bash,
"read": read_file,
"write": write_file,
"edit": edit_file,
"todo": lambda items: todo_manager.update(items),
"task": lambda prompt: run_subagent(prompt),
"task_create": lambda **kw: json.dumps(task_manager.create(**kw), ensure_ascii=False),
"task_update": lambda **kw: json.dumps(task_manager.update(**kw), ensure_ascii=False),
"task_list": lambda **kw: json.dumps(task_manager.list_tasks(**kw), ensure_ascii=False),
"task_get": lambda **kw: json.dumps(task_manager.get_task(**kw), ensure_ascii=False),
}从 6 个工具变成 10 个。Agent Loop 一行不改。
实际运行:任务图怎么转
给 Agent 一个复合任务:
"搭建一个博客系统:数据模型、CRUD 接口、单元测试、部署脚本"
Agent 的执行链路:
第1轮: [task_create] "创建数据模型" → task_1
第2轮: [task_create] "写 CRUD 接口" blocked_by=[1] → task_2
第3轮: [task_create] "写单元测试" blocked_by=[2] → task_3
第4轮: [task_create] "写部署脚本" blocked_by=[] → task_4
第5轮: [task_create] "集成验证" blocked_by=[3,4] → task_5
任务图建好了:
✓ #1 创建数据模型
▶ #2 写 CRUD 接口 [blocked by [1]]
☐ #3 写单元测试 [blocked by [2]]
☐ #4 写部署脚本 ← 和 2,3 并行!
☐ #5 集成验证 [blocked by [3,4]]
第6轮: [task_update] #1 → completed
→ _clear_dependency: task_2 的 blockedBy 从 [1] 变成 []
第7轮: [task_update] #2 → in_progress
第8轮: [bash] 创建 CRUD 接口文件...
第9轮: [task_update] #4 → in_progress ← 和 #2 并行推进!
第10轮: [bash] 创建部署脚本...
第11轮: [task_update] #2 → completed
→ _clear_dependency: task_3 的 blockedBy 从 [2] 变成 []
第12轮: [task_update] #4 → completed
→ _clear_dependency: task_5 的 blockedBy 从 [3,4] 变成 [3]
第13轮: [task_update] #3 → in_progress
第14轮: [bash] 写测试...
第15轮: [task_update] #3 → completed
→ _clear_dependency: task_5 的 blockedBy 从 [3] 变成 []
第16轮: [task_update] #5 → in_progress
第17轮: [bash] 运行集成验证...
第18轮: [task_update] #5 → completed注意第 9 轮:task_4(部署脚本)不依赖 task_2(CRUD 接口),所以 Agent 可以交叉推进。这在扁平清单模式下不可能——WIP=1 约束了线性执行。

洞见:两个关键设计决策
为什么用文件而不是数据库
SQLite 更快,PostgreSQL 更强——但这里刻意选择了文件系统。原因有三:
一、零依赖。 不需要安装任何数据库。任何有文件系统的环境都能跑。
二、可读性。 cat .tasks/task_3.json 就能看到任务状态。出了问题直接用文本编辑器改。调试成本几乎为零。
三、Git 友好。 .tasks/ 目录可以提交到 Git。任务图的变更历史自动被版本控制追踪。你能看到"谁在什么时候把 task_3 从 pending 改成 completed"。
在 Claude Code 的实际实现中,任务文件存储在工作目录的 .tasks/ 下。这不是偶然——它让任务图和代码库绑定在一起,而不是飘在某个全局数据库里。
这个 DAG 是后续所有机制的协调骨架
从这一课开始,任务图成为整个 Agent 系统的中央协调结构:
| 后续课程 | 如何使用任务图 |
|---|---|
| 第 8 课:后台执行 | 后台 Agent 从任务图取 actionable 任务,完成后自动解锁下游 |
| 第 9 课:多 Agent 团队 | 不同 Agent 通过 owner 字段认领任务,避免重复 |
| 第 10-11 课:协作机制 | Agent 之间通过任务图的状态变化传递进度信号 |
| 第 12 课:Worktree 隔离 | 每个 worktree 绑定一个任务 ID,隔离执行互不干扰 |
没有这个任务图,后面的课程全部无法实现。 它是从"单 Agent 串行"到"多 Agent 并发"的转折点。
五分钟跑起来
# 进入项目目录
cd learn-claude-code
# 启动第七课
python agents/s07_task_system.py任务一:创建带依赖的任务链
先让 Agent 创建三个任务,并设置顺序依赖:
s07 >> Create 3 tasks: "Setup project", "Write code", "Write tests".
Make them depend on each other in order.实际执行记录:
> task_create: {"id": 1, "subject": "Setup project", "status": "pending", "blockedBy": []}
> task_create: {"id": 2, "subject": "Write code", "status": "pending", "blockedBy": []}
> task_create: {"id": 3, "subject": "Write tests", "status": "pending", "blockedBy": []}
> task_update: {"id": 2, "blockedBy": [1]} ← 设置依赖:Write code 等 Setup project
> task_update: {"id": 3, "blockedBy": [2]} ← 设置依赖:Write tests 等 Write code注意 Agent 的策略:先创建全部任务,再逐个设置依赖关系。最终依赖链:Setup project → Write code → Write tests。
任务二:查看依赖图全景
退出后重新启动,验证持久化效果——任务还在磁盘上:
s07 >> List all tasks and show the dependency graph实际执行记录:
> task_list:
[ ] #1: Setup project
[ ] #2: Write code (blocked by: [1])
[ ] #3: Write tests (blocked by: [2])
> task_get: #1 → blockedBy: []
> task_get: #2 → blockedBy: [1]
> task_get: #3 → blockedBy: [2]Agent 用 task_list 获取全景,再逐个 task_get 获取详情,然后渲染了一张依赖图:
#1 Setup project (🟢 ready to start)
└──▶ #2 Write code (🔒 blocked by #1)
└──▶ #3 Write tests (🔒 blocked by #2)关键验证: 重启后任务图完整恢复。这是扁平清单做不到的。
任务三:完成任务,观察自动解锁
完成 task_1,观察 task_2 从阻塞变为可执行:
s07 >> Complete task 1 and then list tasks to see task 2 unblocked实际执行记录:
> task_list:
[ ] #1: Setup project
[ ] #2: Write code (blocked by: [1])
[ ] #3: Write tests (blocked by: [2])
> task_update: {"id": 1, "status": "completed"} ← 完成 task_1
→ _clear_dependency 触发:task_2 的 blockedBy 从 [1] 变成 []
> task_list:
[x] #1: Setup project
[ ] #2: Write code ← 解锁了!blockedBy 已清空
[ ] #3: Write tests (blocked by: [2])_clear_dependency 精确生效——task_1 完成后,task_2 自动解锁,task_3 仍被 task_2 阻塞。
任务四:创建带并行分支的 DAG
更复杂的场景——有并行分支和汇聚节点:
s07 >> Create a task board for refactoring: parse -> transform -> emit -> test,
where transform and emit can run in parallel after parse实际执行记录:
> task_create: {"id": 4, "subject": "Parse"}
> task_create: {"id": 5, "subject": "Transform"}
> task_create: {"id": 6, "subject": "Emit"}
> task_create: {"id": 7, "subject": "Test"}
> task_update: {"id": 5, "blockedBy": [4]} ← Transform 等 Parse
> task_update: {"id": 6, "blockedBy": [4]} ← Emit 也等 Parse(并行分支)
> task_update: {"id": 7, "blockedBy": [5, 6]} ← Test 等 Transform 和 Emit 都完成
> task_list:
[x] #1: Setup project
[ ] #2: Write code
[ ] #3: Write tests (blocked by: [2])
[ ] #4: Parse
[ ] #5: Transform (blocked by: [4])
[ ] #6: Emit (blocked by: [4])
[ ] #7: Test (blocked by: [5, 6])Agent 渲染出来的 DAG:
┌──────────────┐
│ #4: Parse │
└──────┬───────┘
│
┌───────┴───────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ #5: Transform│ │ #6: Emit │ ← 并行分支
└──────┬───────┘ └──────┬───────┘
│ │
└───────┬───────┘
▼
┌──────────────┐
│ #7: Test │ ← 汇聚节点
└──────────────┘这就是 DAG 相比扁平清单的核心优势——Transform 和 Emit 可以并行,Test 必须等两者都完成。blockedBy: [5, 6] 天然表达了这种汇聚语义。

变更总结
| 组件 | s03/s06(之前) | s07(本课) |
|---|---|---|
| 任务存储 | 内存列表 | 磁盘 JSON 文件(.tasks/) |
| 依赖关系 | 无 | blockedBy[] 边 |
| 状态模型 | pending/in_progress/completed | 相同三状态,但加了依赖约束 |
| 并行语义 | WIP=1 强制串行 | 无依赖的任务可并行 |
| 持久化 | 内存丢失即消失 | 文件级持久化,跨会话存活 |
| 任务归属 | 无 | owner 字段标记执行者 |
| 依赖解锁 | 无 | _clear_dependency 自动解锁下游 |
| 工具数 | 6 | 10(+task_create/update/list/get) |
| Agent Loop | 不变 | 不变 |
核心变更:从扁平清单到带依赖的任务图,从内存到磁盘,从单人串行到多人可并行。Agent Loop 从第 1 课到第 7 课,依然一行没改。
下一课预告
第 7 课让 Agent 有了持久化的任务图,但所有任务还是在前台串行执行——Agent 做一个任务时,用户只能等着。
第 8 课:Background Tasks —— 后台执行。 把耗时任务丢到后台,Agent 继续处理下一个,后台完成后自动更新任务图。
# 预告:s08 的后台执行
def run_in_background(task_id: int):
"""在独立线程中执行任务,完成后更新任务图。"""
task = task_manager.get_task(task_id)
thread = threading.Thread(
target=_execute_and_complete,
args=(task_id, task["subject"]),
)
thread.start()
return f"Task #{task_id} is now running in background."前台规划,后台执行。Agent 第一次有了"同时做多件事"的能力。
这是《12课拆解Claude Code架构:从零掌握Agent Harness工程》系列的第 7 课。关注Claw开发者,不错过后续更新。
完整代码和交互式学习平台:github.com/shareAI-lab/learn-claude-code
如果这篇文章对你有帮助,欢迎转发给你的技术团队。
系列目录
- 第1课:用20行Python造出你的第一个AI Agent
- 第2课:给Agent加工具 —— dispatch map模式详解
- 第3课:TodoWrite —— 让Agent先想后做:规划系统
- 第4课:Subagent —— 拆解大任务,上下文隔离
- 第5课:按需加载领域知识——Skill机制
- 第6课:无限对话——上下文压缩三层策略
- 第7课:任务持久化——文件级DAG任务图(本文)
- 第8课:后台执行——异步任务与通知队列
- 第9课:Agent Teams——多Agent协作:团队与邮箱系统
- 第10课:团队协议——状态机驱动的协商
- 第11课:自治Agent——自组织任务认领
- 第12课:终极隔离——Worktree并行执行