HuanCode Docs

Harness实战:并行调度——拓扑排序让Subagent飞起来

串行的Subagent太慢?用拓扑排序把任务分层,同层并行执行,有依赖的自动等待。模型负责声明"做什么、什么依赖什么",代码负责"怎么做最快"。

写在前面

上一篇我们给 Agent 加了 Subagent——把子任务放到独立上下文执行,只返回摘要。

但有个明显的瓶颈:子任务是串行的。

模型一次调用一个 task,等它跑完拿到结果,再调用下一个。如果有三个互不相关的调研任务,每个跑 30 秒,串行要 90 秒。并行呢?30 秒。


问题:串行的浪费

想象一个任务:"分析这个项目的前端、后端和数据库技术栈,然后汇总写一份报告。"

串行执行:

task("分析前端技术栈")     → 30s
task("分析后端技术栈")     → 25s
task("分析数据库技术栈")   → 20s
task("汇总写报告")         → 15s(需要前三个结果)
───────────────────────────────
总耗时: 90s

前三个任务互不依赖——它们各自读不同的文件、跑不同的命令。没有理由等一个完了再开始下一个。

而第四个任务确实依赖前三个的结果——你不能在拿到调研数据之前就开始写报告。

这是一个典型的 DAG(有向无环图)调度问题。


解决方案:拓扑排序 + 并行调度

核心思路:

  1. 模型一次提交所有任务,标明依赖关系
  2. 拓扑排序把任务分成若干层
  3. 同层任务用线程池并行执行
  4. 下一层任务自动获得上一层的结果作为上下文
Layer 0 (并行):  [分析前端]  [分析后端]  [分析数据库]
                      │           │            │
                      └───────────┼────────────┘

Layer 1 (串行):          [汇总写报告]

3 个并行 + 1 个串行 = 30 + 15 = 45 秒。省了一半。


实现

在 Subagent 基础上,增加两样东西。

1. parallel_tasks 工具定义

PARALLEL_TASKS_TOOL = {
    "name": "parallel_tasks",
    "description": (
        "Run multiple subtasks with dependency-aware parallel scheduling. "
        "Each task has an id, prompt, and optional depends_on list. "
        "Tasks with no dependencies run in parallel. "
        "Tasks with dependencies wait until all dependencies complete, "
        "then receive their results as context. "
        "Returns a map of {task_id: result}."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "tasks": {
                "type": "array",
                "items": {
                    "type": "object",
                    "properties": {
                        "id": {"type": "string"},
                        "prompt": {"type": "string"},
                        "depends_on": {
                            "type": "array",
                            "items": {"type": "string"},
                        },
                    },
                    "required": ["id", "prompt"],
                },
            }
        },
        "required": ["tasks"],
    },
}

每个任务有三个字段:

字段必填说明
id唯一标识,其他任务通过它声明依赖
prompt子 Agent 的任务描述
depends_on依赖的任务 ID 列表

模型在一次 tool_use 里把所有任务连同依赖关系一起提交,调度器负责执行顺序。

2. 拓扑排序器

from collections import defaultdict

def topological_layers(tasks: list[dict]) -> list[list[dict]]:
    """将任务按依赖关系分层。同层任务无互相依赖,可以并行。"""
    task_map = {t["id"]: t for t in tasks}
    in_degree = {t["id"]: 0 for t in tasks}
    dependents = defaultdict(list)

    for t in tasks:
        for dep in t.get("depends_on", []):
            if dep not in task_map:
                raise ValueError(
                    f"Task '{t['id']}' depends on unknown task '{dep}'"
                )
            in_degree[t["id"]] += 1
            dependents[dep].append(t["id"])

    layers = []
    remaining = dict(in_degree)

    while remaining:
        layer = [tid for tid, deg in remaining.items() if deg == 0]
        if not layer:
            raise ValueError(
                f"Circular dependency: {list(remaining.keys())}"
            )
        layers.append([task_map[tid] for tid in layer])
        for tid in layer:
            del remaining[tid]
            for dependent in dependents[tid]:
                if dependent in remaining:
                    remaining[dependent] -= 1

    return layers

这是 Kahn 算法的变体,按层输出:

  1. 计算每个节点的入度(被多少个任务依赖)
  2. 入度为 0 的节点组成第一层(没有前置依赖)
  3. 移除这些节点,更新剩余节点的入度
  4. 重复,直到所有节点都被分到某一层
  5. 如果某轮没有入度为 0 的节点 → 循环依赖,报错

3. 并行调度器

from concurrent.futures import ThreadPoolExecutor, as_completed

def run_parallel_tasks(tasks: list[dict]) -> str:
    layers = topological_layers(tasks)
    results = {}  # task_id → result string

    for i, layer in enumerate(layers):
        with ThreadPoolExecutor(max_workers=len(layer)) as pool:
            futures = {}
            for task in layer:
                # ★ 注入依赖任务的结果作为上下文
                prompt = task["prompt"]
                deps = task.get("depends_on", [])
                if deps:
                    dep_context = "\n".join(
                        f"[Result of '{d}']: {results[d]}"
                        for d in deps
                    )
                    prompt = (
                        f"{prompt}\n\n"
                        f"Context from prerequisite tasks:\n"
                        f"{dep_context}"
                    )

                futures[pool.submit(run_subagent, prompt)] = task["id"]

            for future in as_completed(futures):
                task_id = futures[future]
                results[task_id] = future.result()

    output_parts = [
        f"[{tid}]: {result}" for tid, result in results.items()
    ]
    return "\n\n".join(output_parts)

三个关键设计:

依赖注入——有 depends_on 的任务,prompt 自动追加前置任务的结果。子 Agent 看到的不是"写一份报告",而是"写一份报告,以下是前置调研结果:……"。

每层一个线程池——同层任务真正并行。as_completed 让先完成的先收集,不等最慢的。

结果累积——results 字典跨层传递,下层任务可以引用上层任何任务的输出。

4. 注册到 dispatch map

TOOLS = BASE_TOOLS + [TASK_TOOL, PARALLEL_TASKS_TOOL]

TOOL_HANDLERS = {
    **BASE_HANDLERS,
    "task": lambda **kw: run_subagent(kw["prompt"]),
    "parallel_tasks": lambda **kw: run_parallel_tasks(kw["tasks"]),
}

还是 dispatch 模式——加新能力只加注册,agent_loop 一行不改。


运行效果

❯ /plan 分析这个项目的前端、后端和数据库技术栈,汇总写一份技术报告

⏳ Generating plan...

📋 Plan:
1. 分析前端技术栈
2. 分析后端技术栈
3. 分析数据库技术栈
4. 汇总三个分析结果,写技术报告

 Execute? (y/n) y

⚡ Executing...

> parallel_tasks:

⚡ Parallel scheduler: 4 tasks, 2 layers
  Layer 0: ['frontend', 'backend', 'database'] (parallel)

  🔀 Subagent started: 分析前端技术栈...
  🔀 Subagent started: 分析后端技术栈...
  🔀 Subagent started: 分析数据库技术栈...
    > bash: cat frontend/package.json | head -30
    > bash: cat backend/pyproject.toml
    > bash: cat docker-compose.yml | grep -A5 postgres
  🔀 Subagent done (6 messages)
  ✓ database (1/4)
  🔀 Subagent done (8 messages)
  ✓ backend (2/4)
  🔀 Subagent done (10 messages)
  ✓ frontend (3/4)
  Layer 0 done in 12.3s

  Layer 1: ['report'] (parallel)

  🔀 Subagent started: 汇总写技术报告...
  🔀 Subagent done (4 messages)
  ✓ report (4/4)
  Layer 1 done in 8.1s

4 个任务分成 2 层:前 3 个并行(12.3s),第 4 个等前置完成后执行(8.1s),总耗时 ~20s。串行大约需要 40-50s。


核心机制解析

为什么用拓扑排序而不是"全部并行"?

因为有些任务确实依赖其他任务的输出。你不能在拿到调研结果之前就开始写报告。

拓扑排序给了最大化并行度的保证:如果两个任务没有直接或间接依赖,它们一定在同一层或更早的层,一定会并行。

循环依赖怎么办?

if not layer:
    raise ValueError(f"Circular dependency: {list(remaining.keys())}")

某轮里没有入度为 0 的节点 = 剩余节点形成了循环。直接报错,让模型重新规划。

为什么用 ThreadPoolExecutor 而不是 asyncio?

因为 run_subagent 内部的 client.messages.create 是同步阻塞调用。线程池让多个阻塞调用真正并行。

如果用 asyncio,需要把整个 API 调用链改成异步,复杂度翻倍但核心逻辑不变。教学代码选择最简单的正确方案。

子 Agent 怎么知道前置任务的结果?

if deps:
    dep_context = "\n".join(
        f"[Result of '{d}']: {results[d]}" for d in deps
    )
    prompt = f"{prompt}\n\nContext from prerequisite tasks:\n{dep_context}"

直接拼到 prompt 里。子 Agent 不知道也不需要知道这些结果来自另一个 Agent——它只看到一段带上下文的任务描述。这就是 Subagent 隔离的好处:每个子 Agent 只需要关心自己的事。


task vs parallel_tasks:什么时候用哪个?

场景工具原因
单个独立子任务task简单直接
多个互不依赖的子任务parallel_tasks并行省时间
有依赖链的多个子任务parallel_tasks自动排序 + 最大化并行
需要前一步结果决定下一步做什么task x N父 Agent 需要中间结果来决策

parallel_tasks 适合提前知道所有任务和依赖关系的场景。如果需要根据中间结果动态决定下一步,还是用 task 让父 Agent 逐步决策。


两种并行风格的对比

Claude Code 的 Agent 工具支持在一个响应中发出多个 tool_use,运行时自然并行。但它没有显式的依赖声明——并行粒度由模型自己决定。

对比项隐式并行(Claude Code 风格)显式 DAG(本文方案)
并行声明模型在同一响应中发多个 tool_use一个 tool_use 里声明完整任务图
依赖处理模型自行决定先后顺序调度器保证拓扑序
灵活性高,可以临场决策中,需要提前规划好任务图
适用场景2-3 个简单并行任务多任务、复杂依赖链

隐式并行更灵活,显式 DAG 更可靠。它们不矛盾——你完全可以让父 Agent 在一个响应里同时调用 taskparallel_tasks


小结

  1. parallel_tasks 工具 — 一次提交多个任务和依赖关系
  2. 拓扑排序 — Kahn 算法按层分组,同层无依赖
  3. ThreadPoolExecutor — 同层任务真正并行
  4. 依赖注入 — 前置任务的结果自动拼到后续任务的 prompt
  5. 循环检测 — 发现循环依赖立即报错

本质是把调度逻辑从模型脑子里移到了代码里。模型负责说"做什么、什么依赖什么",代码负责"怎么做最快"。各司其职。


提示词的配套改动

光有工具不够,提示词也得跟上。改了两处。

1. System Prompt:告诉模型有并行能力

之前的 system prompt:

SYSTEM = "You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain."

加了并行策略指引:

SYSTEM = f"""You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain.

When a task can be broken into subtasks, use `parallel_tasks` to run them concurrently.
Put ALL tasks (including dependent ones) in a SINGLE `parallel_tasks` call — the scheduler handles ordering.
Each task needs an id and prompt. Use depends_on to declare dependencies between tasks.
Do NOT split tasks across multiple `parallel_tasks` calls — dependent tasks would lose access to earlier results.
Only use sequential `task` calls when the next step depends on seeing the previous result to decide what to do."""

两个关键指令:

  • "Put ALL tasks in a SINGLE call" — 必须把所有任务(包括有依赖的)放在一次 parallel_tasks 调用里。调度器会自动处理执行顺序,模型不需要自己分批。
  • 最后一句 — 告诉模型什么时候不该用 parallel_tasks。不加这个限制,模型可能把所有任务都塞进去,包括需要根据中间结果临时决策的场景。

2. Plan 阶段:要求标注依赖关系

之前的 plan 提示词:

"Analyze the task and outline a step-by-step plan. Do NOT execute anything yet."

改成:

"Analyze the task and outline a step-by-step plan. Do NOT execute anything yet. "
"For each step, note which other steps it depends on (if any). "
"Mark steps that can run in parallel."

这样模型在规划阶段就会输出结构化的计划:

1. 分析前端技术栈(无依赖)
2. 分析后端技术栈(无依赖)
3. 分析数据库技术栈(无依赖)
4. 汇总写报告(依赖 1, 2, 3)

步骤 1-3 可并行执行。

有了这个结构,执行阶段模型自然知道怎么填 depends_on 字段。

提示词设计原则

工具定义里的 description 告诉模型"这个工具能做什么",但不够。模型还需要知道:

  • 什么时候该用它(有多个独立子任务时)
  • 什么时候不该用它(需要看中间结果决策时)
  • 怎么准备数据(在规划阶段就标注依赖)

这三层信息分别放在 system prompt、plan 提示词、和执行提示词里。


踩坑:模型把任务拆成多次调用

测试时遇到一个真实的 bug,值得单独记录。

给模型一个 4 层依赖的代码审计任务:

Step 1: 统计文件信息(无依赖)
Step 2: 提取 import 列表(无依赖)
Step 3a: 分析复杂度(依赖 Step 1)
Step 3b: 分析依赖用途(依赖 Step 2)
Step 4: 交叉对比(依赖 Step 3a + 3b)
Step 5: 生成报告(依赖 Step 4)

模型输出了一个很好的计划。但执行阶段崩了:

> parallel_tasks: [step1_file_info, step2_imports]     ← 第一次调用,只提交了 Layer 0

✓ step1_file_info
✓ step2_imports

> parallel_tasks: [step3a_complexity, step3b_deps]     ← 第二次调用,提交 Layer 1
  step3a_complexity depends_on: [step1_file_info]

ValueError: Task 'step3a_complexity' depends on unknown task 'step1_file_info'

模型按层拆分,先调一次 parallel_tasks 跑 Layer 0,等结果出来再调一次跑 Layer 1。看起来很"聪明",但其实完全错了——第二次调用是一个独立的 run_parallel_tasks,它的 results = {} 是空的,根本找不到 step1_file_info 的结果。

根因:模型把自己当成了调度器。它认为"我应该先跑没有依赖的,拿到结果再跑有依赖的"。但这正是 topological_layers 调度器要做的事——模型不需要操心执行顺序。

修复:在 system prompt 里加了两句硬性约束:

Put ALL tasks (including dependent ones) in a SINGLE `parallel_tasks` call
— the scheduler handles ordering.
Do NOT split tasks across multiple `parallel_tasks` calls
— dependent tasks would lose access to earlier results.

修复后,模型把所有 6 个任务放在一次调用里,调度器自动分成 4 层并行执行,完美运行。

教训:当你给模型一个带调度能力的工具时,必须明确告诉它不要自己调度。模型天然倾向于"替你想",如果不加约束,它会试图在 tool_use 层面手动编排执行顺序,反而破坏了代码层面已经实现好的调度逻辑。

这也是 prompt engineering 的一个通用原则:工具越智能,prompt 越要约束模型的"聪明"

On this page