HuanCode Docs

Harness实战:后台任务——Fire-and-Forget,Agent不等了

并行任务解决了"多个子任务同时跑"的问题,但父Agent还是得等所有结果回来才能继续。后台任务让Agent提交长耗时工作后立即返回,继续和用户对话,随时用check_task查看进度。

写在前面

上一篇我们给 Agent 加了并行调度——拓扑排序让无依赖的子任务同时跑,有依赖的自动等待。

但并行调度有一个特性:它是同步阻塞的。

parallel_tasks 调用后,父 Agent 挂起,等所有层都执行完,拿到全部结果,才回到 agent_loop 的下一轮。用户在这段时间什么也做不了。

有些任务,你确实需要等结果——"分析三个模块然后汇总"。但有些任务,你根本不需要等——"跑一下测试套件"、"在后台做个大文件分析"。


问题:不需要等的任务,也被迫等

想象一个场景:

用户: 帮我写一个 fizzbuzz.py(1到100),然后跑一下确认输出正确

Agent 的执行:
1. 写 fizzbuzz.py                     → 2s
2. task("跑 python fizzbuzz.py 验证")  → 15s  ← 阻塞等待
3. 告诉用户结果                        → 1s

第 2 步跑脚本要十几秒。这段时间里,Agent 和用户都干等着。

但其实 Agent 完全可以说:"代码已写好,在后台跑着验证,你可以继续问我其他事。跑完我告诉你结果。"

这就是后台任务的场景——Fire-and-Forget


同步 vs 后台:一张图说清楚

同步 task / parallel_tasks:

  用户请求 → Agent → [子任务执行中...60s...] → 拿到结果 → 回复用户
                     ^^^^^^^^^^^^^^^^^^^^^^^^
                     这段时间 Agent 挂起,用户等待

后台 background_task:

  用户请求 → Agent → 提交后台任务 → 立即回复"已提交,task_id: abc123"

                          └→ [子任务在后台执行中...60s...]

  用户继续对话 → Agent 继续工作                      │
                          │                         │
  用户: "跑完了吗?"       → check_task("abc123") ←──┘
                          → "done, 全部通过"

核心区别:父 Agent 不等,继续对话。


实现

parallel_tasks 的基础上,增加三样东西。

1. 后台任务注册表

import threading
import uuid
from concurrent.futures import ThreadPoolExecutor, Future

# 全局线程池:后台任务共享,避免每次创建/销毁
bg_pool = ThreadPoolExecutor(max_workers=4)

# {task_id: Future} — 注册表
bg_tasks: dict[str, Future] = {}
bg_lock = threading.Lock()

三个组件:

组件作用
bg_pool全局线程池,最多 4 个后台任务并发
bg_taskstask_id → Future 的映射,查状态用
bg_lock线程锁,保护注册表的并发读写

为什么用全局线程池而不是每次 ThreadPoolExecutor?因为后台任务的生命周期不确定——提交时不知道什么时候查结果。全局池让 Future 一直活着,直到被查询或程序退出。

2. background_task 工具

BACKGROUND_TASK_TOOL = {
    "name": "background_task",
    "description": (
        "Launch a subtask in the background. Returns immediately with a task_id. "
        "The task keeps running while you continue the conversation. "
        "Use for long-running work: tests, builds, large analysis. "
        "Check status later with check_task."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "prompt": {
                "type": "string",
                "description": "Task description for the background subagent",
            }
        },
        "required": ["prompt"],
    },
}

handler 实现:

def run_background_task(prompt: str) -> str:
    """提交后台任务,立即返回 task_id。"""
    task_id = uuid.uuid4().hex[:8]

    future = bg_pool.submit(run_subagent, prompt)

    with bg_lock:
        bg_tasks[task_id] = future

    # 完成时在终端提醒用户
    def on_done(fut: Future, tid: str = task_id):
        status = "error" if fut.exception() else "done"
        print(f"\n  📬 Background task [{tid}] {status}!")

    future.add_done_callback(on_done)

    return json.dumps({"task_id": task_id, "status": "submitted"})

关键设计:

立即返回bg_pool.submit() 是非阻塞的,把任务丢进线程池就走。模型拿到 {"task_id": "abc123", "status": "submitted"},可以立即对用户说"已提交"。

完成回调future.add_done_callback(on_done) 注册回调,任务完成后在终端打印通知。这样即使用户没主动问,也能在终端看到"📬 后台任务完成了"。

8 位 task_iduuid4().hex[:8] 生成短 ID,方便模型和用户引用。

3. check_task 工具

CHECK_TASK_TOOL = {
    "name": "check_task",
    "description": (
        "Check the status of a background task. "
        "Returns {status: 'running'} if still in progress, "
        "or {status: 'done', result: '...'} when finished, "
        "or {status: 'error', error: '...'} if it failed."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "task_id": {
                "type": "string",
                "description": "The task_id returned by background_task",
            }
        },
        "required": ["task_id"],
    },
}

handler 实现:

def run_check_task(task_id: str) -> str:
    """查询后台任务状态。"""
    with bg_lock:
        future = bg_tasks.get(task_id)

    if future is None:
        return json.dumps({
            "status": "not_found",
            "error": f"No task with id '{task_id}'"
        })

    if not future.done():
        return json.dumps({"status": "running"})

    exc = future.exception()
    if exc:
        return json.dumps({"status": "error", "error": str(exc)})

    return json.dumps({"status": "done", "result": future.result()})

三种状态:

状态含义模型行为
running还在跑告诉用户"还在执行",或者继续干别的
done完成,带 result把结果告诉用户
error失败,带错误信息报告错误,决定是否重试

4. 注册到 dispatch map

TOOLS = BASE_TOOLS + [
    TASK_TOOL,
    PARALLEL_TASKS_TOOL,
    BACKGROUND_TASK_TOOL,  # 新增
    CHECK_TASK_TOOL,       # 新增
]

TOOL_HANDLERS = {
    **BASE_HANDLERS,
    "task": lambda **kw: run_subagent(kw["prompt"]),
    "parallel_tasks": lambda **kw: run_parallel_tasks(kw["tasks"]),
    "background_task": lambda **kw: run_background_task(kw["prompt"]),  # 新增
    "check_task": lambda **kw: run_check_task(kw["task_id"]),          # 新增
}

还是 dispatch 模式——加新能力只加注册,agent_loop 一行不改。


运行效果

❯ 帮我写一个 fizzbuzz.py(1到100),写完后在后台跑一下确认输出正确,
  同时帮我写一个 README.md 说明用法

> write_file: fizzbuzz.py
> background_task: {"prompt": "Run python fizzbuzz.py and verify output"}
  🚀 Background task [e4a91c3f] submitted

> write_file: README.md

fizzbuzz.py 和 README.md 都已写好。
测试在后台跑着 (task_id: e4a91c3f),你可以继续问我别的。

❯ 帮我看一下 pyproject.toml 里的 Python 版本要求对不对

> read_file: pyproject.toml

pyproject.toml 里写的 requires-python = ">=3.10",和代码里用的
match/case 语法一致,没问题。

  📬 Background task [e4a91c3f] done!

❯ fizzbuzz 跑得怎么样?

> check_task: {"task_id": "e4a91c3f"}

运行正确。输出 100 行:Fizz 27次、Buzz 14次、FizzBuzz 6次,
其余为数字,总计 100 行,无异常。

注意时间线:Agent 提交验证后立刻回复了用户,然后处理了另一个完全不相关的问题(检查 pyproject.toml)。验证在后台默默跑着,跑完后终端弹出通知,用户下一轮对话时 Agent 通过 check_task 获取结果。


优雅退出

后台任务的一个问题:用户输入 q 退出时,可能还有任务在跑。直接退出会丢失结果。

if __name__ == "__main__":
    # ... 主循环 ...

    # 优雅退出:等待后台任务完成
    with bg_lock:
        pending = {tid: f for tid, f in bg_tasks.items() if not f.done()}
    if pending:
        print(f"\n⏳ Waiting for {len(pending)} background task(s)...")
        for tid, future in pending.items():
            try:
                future.result(timeout=300)
                print(f"  ✓ {tid} done")
            except Exception as e:
                print(f"  ✗ {tid} error: {e}")
    bg_pool.shutdown(wait=False)

退出时检查注册表,有未完成的就等(最多 5 分钟),然后关闭线程池。


核心机制解析

为什么不直接用 parallel_tasks 的线程池?

parallel_tasks 的线程池是局部的——with ThreadPoolExecutor(...) as pool,出了 with 块就销毁。后台任务的 Future 需要一直活着,直到被查询。所以用全局线程池。

模型怎么知道什么时候该查结果?

两种方式:

  1. 用户主动问 — "跑完了吗?" → 模型调 check_task
  2. 模型自己判断 — 模型记住了 task_id,在后续对话中认为需要这个结果时主动 check

实际测试中,模型表现不错——它会在 background_task 返回后告诉用户 task_id,并在后续轮次中主动检查。

线程安全问题

bg_tasks 字典可能被多个线程同时读写:

  • 主线程:run_background_task 写入、run_check_task 读取
  • 后台线程:on_done 回调可能与主线程同时访问

所以需要 bg_lock。Python 的 GIL 保护了简单的字典操作,但显式加锁更安全——尤其是未来如果用多进程或异步运行时。

Future 的生命周期

submit()          → Future 创建,状态: running
on_done()        → Future 完成,终端通知
check_task()      → 读取 Future 结果
程序退出           → bg_pool.shutdown() 清理

Future 一直留在 bg_tasks 字典里,不会被垃圾回收。对于教学 demo 这不是问题。生产环境需要加 TTL 或定期清理。


task vs parallel_tasks vs background_task

现在 Agent 有三种任务委派方式,适用场景不同:

工具阻塞?返回值适用场景
task子任务结果需要结果才能继续的单个子任务
parallel_tasks所有结果多个子任务,需要全部结果后汇总
background_tasktask_id长耗时工作,不阻塞当前对话

一个类比:

  • task = 打电话。你等对方说完才挂。
  • parallel_tasks = 群发电话,同时打给三个人,都说完了你才挂。
  • background_task = 发消息。发完就干别的,想看回复时再看。

提示词的配套改动

光有工具不够,提示词也得跟上。改了三处。

1. System Prompt:告诉模型有后台能力

之前(并行篇)的 system prompt 只有两种策略:

SYSTEM = f"""You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain.

When a task can be broken into subtasks, use `parallel_tasks` to run them concurrently.
Put ALL tasks (including dependent ones) in a SINGLE `parallel_tasks` call — the scheduler handles ordering.
..."""

现在改成策略菜单,列出所有四种委派方式:

SYSTEM = f"""You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain.

You have several task delegation strategies:
- `task`: Run a subtask synchronously. Blocks until done, returns result.
- `parallel_tasks`: Run multiple subtasks concurrently with dependency ordering. Blocks until ALL complete.
- `background_task`: Fire-and-forget. Returns a task_id immediately. The task runs while you continue the conversation.
- `check_task`: Poll a background task by task_id. Returns running/done/error.

For background_task: launch it, tell the user the task_id, and keep working. Check later with check_task."""

两个关键设计:

  • 策略菜单格式 — 把四种工具放在一个列表里对比,模型能更准确地选择。之前是零散的指令,现在是结构化的参考表。
  • "tell the user the task_id, and keep working" — 这句是后台任务的行为锚点。不加这句,模型提交后台任务后可能会干等结果(它习惯了同步模式),而不是继续对话。

2. Plan 阶段:标注适合后台执行的步骤

之前(并行篇)的 plan 提示词:

PLAN_PROMPT = (
    "Analyze the task and outline a step-by-step plan. Do NOT execute anything yet. "
    "For each step, note which other steps it depends on (if any). "
    "Mark steps that can run in parallel."
)

加了一句:

PLAN_PROMPT = (
    "Analyze the task and outline a step-by-step plan. Do NOT execute anything yet. "
    "For each step, note which other steps it depends on (if any). "
    "Mark steps that can run in parallel. "
    "Identify long-running steps that could run in the background."
)

这样模型在规划阶段就会区分:

1. 写 fizzbuzz.py(同步,需要确认写入成功)
2. 在后台跑 python fizzbuzz.py 验证输出(后台,耗时长,不阻塞)
3. 写 README.md(同步,可以和 2 同时进行)

有了这个标注,执行阶段模型自然知道第 2 步该用 background_task

3. 执行阶段:告诉模型怎么用后台工具

之前(并行篇)的执行提示词:

EXECUTE_PROMPT = (
    "Now execute it step by step. "
    "Use `parallel_tasks` to run multiple subtasks concurrently with dependency ordering. "
    "Only use sequential `task` calls when the next step depends on seeing the previous result."
)

现在加上后台策略:

EXECUTE_PROMPT = (
    "Now execute it step by step. "
    "Use `parallel_tasks` to run multiple subtasks concurrently with dependency ordering. "
    "Use `background_task` for long-running work that doesn't block progress. "
    "Use `check_task` to retrieve background results when needed. "
    "Only use sequential `task` calls when the next step depends on seeing the previous result to decide what to do."
)

三层提示词各管一件事:

提示词告诉模型什么
System Prompt有哪些工具,各自的语义是什么
Plan 提示词规划时标注哪些步骤适合后台
执行提示词执行时用哪个工具对应哪种步骤

为什么要在三个地方分别提?

一个常见的误区是把所有指引都塞进 system prompt。但模型在不同阶段关注的东西不同:

  • System Prompt 是全局参考——"你有什么能力"
  • Plan 提示词 影响规划输出——"规划时注意什么"
  • 执行提示词 影响工具选择——"执行时怎么选"

在对的时机给对的信息,比一次倒完更有效。


小结

  1. 全局线程池ThreadPoolExecutor(max_workers=4) 让后台任务共享,生命周期跨越整个会话
  2. Future 注册表{task_id: Future} 存储状态,check_task 按 ID 查询
  3. 完成回调add_done_callback 在终端推送通知
  4. 优雅退出 — 程序结束前等待未完成的后台任务

增量代码约 30 行。本质是把 run_subagent 从"同步调用"变成"提交到线程池 + 返回 Future"——底层执行逻辑完全不变,只是套了一层异步壳。

这也是 Claude Code 里 run_in_background 参数的核心思路:同一个 Agent 执行机制,同步和异步只差在"等不等 Future"。

On this page