Harness实战:并行调度——拓扑排序让Subagent飞起来
串行的Subagent太慢?用拓扑排序把任务分层,同层并行执行,有依赖的自动等待。模型负责声明"做什么、什么依赖什么",代码负责"怎么做最快"。
写在前面
上一篇我们给 Agent 加了 Subagent——把子任务放到独立上下文执行,只返回摘要。
但有个明显的瓶颈:子任务是串行的。
模型一次调用一个 task,等它跑完拿到结果,再调用下一个。如果有三个互不相关的调研任务,每个跑 30 秒,串行要 90 秒。并行呢?30 秒。
问题:串行的浪费
想象一个任务:"分析这个项目的前端、后端和数据库技术栈,然后汇总写一份报告。"
串行执行:
task("分析前端技术栈") → 30s
task("分析后端技术栈") → 25s
task("分析数据库技术栈") → 20s
task("汇总写报告") → 15s(需要前三个结果)
───────────────────────────────
总耗时: 90s前三个任务互不依赖——它们各自读不同的文件、跑不同的命令。没有理由等一个完了再开始下一个。
而第四个任务确实依赖前三个的结果——你不能在拿到调研数据之前就开始写报告。
这是一个典型的 DAG(有向无环图)调度问题。
解决方案:拓扑排序 + 并行调度
核心思路:
- 模型一次提交所有任务,标明依赖关系
- 拓扑排序把任务分成若干层
- 同层任务用线程池并行执行
- 下一层任务自动获得上一层的结果作为上下文
Layer 0 (并行): [分析前端] [分析后端] [分析数据库]
│ │ │
└───────────┼────────────┘
▼
Layer 1 (串行): [汇总写报告]3 个并行 + 1 个串行 = 30 + 15 = 45 秒。省了一半。
实现
在 Subagent 基础上,增加两样东西。
1. parallel_tasks 工具定义
PARALLEL_TASKS_TOOL = {
"name": "parallel_tasks",
"description": (
"Run multiple subtasks with dependency-aware parallel scheduling. "
"Each task has an id, prompt, and optional depends_on list. "
"Tasks with no dependencies run in parallel. "
"Tasks with dependencies wait until all dependencies complete, "
"then receive their results as context. "
"Returns a map of {task_id: result}."
),
"input_schema": {
"type": "object",
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"prompt": {"type": "string"},
"depends_on": {
"type": "array",
"items": {"type": "string"},
},
},
"required": ["id", "prompt"],
},
}
},
"required": ["tasks"],
},
}每个任务有三个字段:
| 字段 | 必填 | 说明 |
|---|---|---|
id | 是 | 唯一标识,其他任务通过它声明依赖 |
prompt | 是 | 子 Agent 的任务描述 |
depends_on | 否 | 依赖的任务 ID 列表 |
模型在一次 tool_use 里把所有任务连同依赖关系一起提交,调度器负责执行顺序。
2. 拓扑排序器
from collections import defaultdict
def topological_layers(tasks: list[dict]) -> list[list[dict]]:
"""将任务按依赖关系分层。同层任务无互相依赖,可以并行。"""
task_map = {t["id"]: t for t in tasks}
in_degree = {t["id"]: 0 for t in tasks}
dependents = defaultdict(list)
for t in tasks:
for dep in t.get("depends_on", []):
if dep not in task_map:
raise ValueError(
f"Task '{t['id']}' depends on unknown task '{dep}'"
)
in_degree[t["id"]] += 1
dependents[dep].append(t["id"])
layers = []
remaining = dict(in_degree)
while remaining:
layer = [tid for tid, deg in remaining.items() if deg == 0]
if not layer:
raise ValueError(
f"Circular dependency: {list(remaining.keys())}"
)
layers.append([task_map[tid] for tid in layer])
for tid in layer:
del remaining[tid]
for dependent in dependents[tid]:
if dependent in remaining:
remaining[dependent] -= 1
return layers这是 Kahn 算法的变体,按层输出:
- 计算每个节点的入度(被多少个任务依赖)
- 入度为 0 的节点组成第一层(没有前置依赖)
- 移除这些节点,更新剩余节点的入度
- 重复,直到所有节点都被分到某一层
- 如果某轮没有入度为 0 的节点 → 循环依赖,报错
3. 并行调度器
from concurrent.futures import ThreadPoolExecutor, as_completed
def run_parallel_tasks(tasks: list[dict]) -> str:
layers = topological_layers(tasks)
results = {} # task_id → result string
for i, layer in enumerate(layers):
with ThreadPoolExecutor(max_workers=len(layer)) as pool:
futures = {}
for task in layer:
# ★ 注入依赖任务的结果作为上下文
prompt = task["prompt"]
deps = task.get("depends_on", [])
if deps:
dep_context = "\n".join(
f"[Result of '{d}']: {results[d]}"
for d in deps
)
prompt = (
f"{prompt}\n\n"
f"Context from prerequisite tasks:\n"
f"{dep_context}"
)
futures[pool.submit(run_subagent, prompt)] = task["id"]
for future in as_completed(futures):
task_id = futures[future]
results[task_id] = future.result()
output_parts = [
f"[{tid}]: {result}" for tid, result in results.items()
]
return "\n\n".join(output_parts)三个关键设计:
依赖注入——有 depends_on 的任务,prompt 自动追加前置任务的结果。子 Agent 看到的不是"写一份报告",而是"写一份报告,以下是前置调研结果:……"。
每层一个线程池——同层任务真正并行。as_completed 让先完成的先收集,不等最慢的。
结果累积——results 字典跨层传递,下层任务可以引用上层任何任务的输出。
4. 注册到 dispatch map
TOOLS = BASE_TOOLS + [TASK_TOOL, PARALLEL_TASKS_TOOL]
TOOL_HANDLERS = {
**BASE_HANDLERS,
"task": lambda **kw: run_subagent(kw["prompt"]),
"parallel_tasks": lambda **kw: run_parallel_tasks(kw["tasks"]),
}还是 dispatch 模式——加新能力只加注册,agent_loop 一行不改。
运行效果
❯ /plan 分析这个项目的前端、后端和数据库技术栈,汇总写一份技术报告
⏳ Generating plan...
📋 Plan:
1. 分析前端技术栈
2. 分析后端技术栈
3. 分析数据库技术栈
4. 汇总三个分析结果,写技术报告
Execute? (y/n) y
⚡ Executing...
> parallel_tasks:
⚡ Parallel scheduler: 4 tasks, 2 layers
Layer 0: ['frontend', 'backend', 'database'] (parallel)
🔀 Subagent started: 分析前端技术栈...
🔀 Subagent started: 分析后端技术栈...
🔀 Subagent started: 分析数据库技术栈...
> bash: cat frontend/package.json | head -30
> bash: cat backend/pyproject.toml
> bash: cat docker-compose.yml | grep -A5 postgres
🔀 Subagent done (6 messages)
✓ database (1/4)
🔀 Subagent done (8 messages)
✓ backend (2/4)
🔀 Subagent done (10 messages)
✓ frontend (3/4)
Layer 0 done in 12.3s
Layer 1: ['report'] (parallel)
🔀 Subagent started: 汇总写技术报告...
🔀 Subagent done (4 messages)
✓ report (4/4)
Layer 1 done in 8.1s4 个任务分成 2 层:前 3 个并行(12.3s),第 4 个等前置完成后执行(8.1s),总耗时 ~20s。串行大约需要 40-50s。
核心机制解析
为什么用拓扑排序而不是"全部并行"?
因为有些任务确实依赖其他任务的输出。你不能在拿到调研结果之前就开始写报告。
拓扑排序给了最大化并行度的保证:如果两个任务没有直接或间接依赖,它们一定在同一层或更早的层,一定会并行。
循环依赖怎么办?
if not layer:
raise ValueError(f"Circular dependency: {list(remaining.keys())}")某轮里没有入度为 0 的节点 = 剩余节点形成了循环。直接报错,让模型重新规划。
为什么用 ThreadPoolExecutor 而不是 asyncio?
因为 run_subagent 内部的 client.messages.create 是同步阻塞调用。线程池让多个阻塞调用真正并行。
如果用 asyncio,需要把整个 API 调用链改成异步,复杂度翻倍但核心逻辑不变。教学代码选择最简单的正确方案。
子 Agent 怎么知道前置任务的结果?
if deps:
dep_context = "\n".join(
f"[Result of '{d}']: {results[d]}" for d in deps
)
prompt = f"{prompt}\n\nContext from prerequisite tasks:\n{dep_context}"直接拼到 prompt 里。子 Agent 不知道也不需要知道这些结果来自另一个 Agent——它只看到一段带上下文的任务描述。这就是 Subagent 隔离的好处:每个子 Agent 只需要关心自己的事。
task vs parallel_tasks:什么时候用哪个?
| 场景 | 工具 | 原因 |
|---|---|---|
| 单个独立子任务 | task | 简单直接 |
| 多个互不依赖的子任务 | parallel_tasks | 并行省时间 |
| 有依赖链的多个子任务 | parallel_tasks | 自动排序 + 最大化并行 |
| 需要前一步结果决定下一步做什么 | task x N | 父 Agent 需要中间结果来决策 |
parallel_tasks 适合提前知道所有任务和依赖关系的场景。如果需要根据中间结果动态决定下一步,还是用 task 让父 Agent 逐步决策。
两种并行风格的对比
Claude Code 的 Agent 工具支持在一个响应中发出多个 tool_use,运行时自然并行。但它没有显式的依赖声明——并行粒度由模型自己决定。
| 对比项 | 隐式并行(Claude Code 风格) | 显式 DAG(本文方案) |
|---|---|---|
| 并行声明 | 模型在同一响应中发多个 tool_use | 一个 tool_use 里声明完整任务图 |
| 依赖处理 | 模型自行决定先后顺序 | 调度器保证拓扑序 |
| 灵活性 | 高,可以临场决策 | 中,需要提前规划好任务图 |
| 适用场景 | 2-3 个简单并行任务 | 多任务、复杂依赖链 |
隐式并行更灵活,显式 DAG 更可靠。它们不矛盾——你完全可以让父 Agent 在一个响应里同时调用 task 和 parallel_tasks。
小结
- parallel_tasks 工具 — 一次提交多个任务和依赖关系
- 拓扑排序 — Kahn 算法按层分组,同层无依赖
- ThreadPoolExecutor — 同层任务真正并行
- 依赖注入 — 前置任务的结果自动拼到后续任务的 prompt
- 循环检测 — 发现循环依赖立即报错
本质是把调度逻辑从模型脑子里移到了代码里。模型负责说"做什么、什么依赖什么",代码负责"怎么做最快"。各司其职。
提示词的配套改动
光有工具不够,提示词也得跟上。改了两处。
1. System Prompt:告诉模型有并行能力
之前的 system prompt:
SYSTEM = "You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain."加了并行策略指引:
SYSTEM = f"""You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain.
When a task can be broken into subtasks, use `parallel_tasks` to run them concurrently.
Put ALL tasks (including dependent ones) in a SINGLE `parallel_tasks` call — the scheduler handles ordering.
Each task needs an id and prompt. Use depends_on to declare dependencies between tasks.
Do NOT split tasks across multiple `parallel_tasks` calls — dependent tasks would lose access to earlier results.
Only use sequential `task` calls when the next step depends on seeing the previous result to decide what to do."""两个关键指令:
- "Put ALL tasks in a SINGLE call" — 必须把所有任务(包括有依赖的)放在一次
parallel_tasks调用里。调度器会自动处理执行顺序,模型不需要自己分批。 - 最后一句 — 告诉模型什么时候不该用
parallel_tasks。不加这个限制,模型可能把所有任务都塞进去,包括需要根据中间结果临时决策的场景。
2. Plan 阶段:要求标注依赖关系
之前的 plan 提示词:
"Analyze the task and outline a step-by-step plan. Do NOT execute anything yet."改成:
"Analyze the task and outline a step-by-step plan. Do NOT execute anything yet. "
"For each step, note which other steps it depends on (if any). "
"Mark steps that can run in parallel."这样模型在规划阶段就会输出结构化的计划:
1. 分析前端技术栈(无依赖)
2. 分析后端技术栈(无依赖)
3. 分析数据库技术栈(无依赖)
4. 汇总写报告(依赖 1, 2, 3)
步骤 1-3 可并行执行。有了这个结构,执行阶段模型自然知道怎么填 depends_on 字段。
提示词设计原则
工具定义里的 description 告诉模型"这个工具能做什么",但不够。模型还需要知道:
- 什么时候该用它(有多个独立子任务时)
- 什么时候不该用它(需要看中间结果决策时)
- 怎么准备数据(在规划阶段就标注依赖)
这三层信息分别放在 system prompt、plan 提示词、和执行提示词里。
踩坑:模型把任务拆成多次调用
测试时遇到一个真实的 bug,值得单独记录。
给模型一个 4 层依赖的代码审计任务:
Step 1: 统计文件信息(无依赖)
Step 2: 提取 import 列表(无依赖)
Step 3a: 分析复杂度(依赖 Step 1)
Step 3b: 分析依赖用途(依赖 Step 2)
Step 4: 交叉对比(依赖 Step 3a + 3b)
Step 5: 生成报告(依赖 Step 4)模型输出了一个很好的计划。但执行阶段崩了:
> parallel_tasks: [step1_file_info, step2_imports] ← 第一次调用,只提交了 Layer 0
✓ step1_file_info
✓ step2_imports
> parallel_tasks: [step3a_complexity, step3b_deps] ← 第二次调用,提交 Layer 1
step3a_complexity depends_on: [step1_file_info]
ValueError: Task 'step3a_complexity' depends on unknown task 'step1_file_info'模型按层拆分,先调一次 parallel_tasks 跑 Layer 0,等结果出来再调一次跑 Layer 1。看起来很"聪明",但其实完全错了——第二次调用是一个独立的 run_parallel_tasks,它的 results = {} 是空的,根本找不到 step1_file_info 的结果。
根因:模型把自己当成了调度器。它认为"我应该先跑没有依赖的,拿到结果再跑有依赖的"。但这正是 topological_layers 调度器要做的事——模型不需要操心执行顺序。
修复:在 system prompt 里加了两句硬性约束:
Put ALL tasks (including dependent ones) in a SINGLE `parallel_tasks` call
— the scheduler handles ordering.
Do NOT split tasks across multiple `parallel_tasks` calls
— dependent tasks would lose access to earlier results.修复后,模型把所有 6 个任务放在一次调用里,调度器自动分成 4 层并行执行,完美运行。
教训:当你给模型一个带调度能力的工具时,必须明确告诉它不要自己调度。模型天然倾向于"替你想",如果不加约束,它会试图在 tool_use 层面手动编排执行顺序,反而破坏了代码层面已经实现好的调度逻辑。
这也是 prompt engineering 的一个通用原则:工具越智能,prompt 越要约束模型的"聪明"。