Harness实战:后台任务——Fire-and-Forget,Agent不等了
并行任务解决了"多个子任务同时跑"的问题,但父Agent还是得等所有结果回来才能继续。后台任务让Agent提交长耗时工作后立即返回,继续和用户对话,随时用check_task查看进度。
写在前面
上一篇我们给 Agent 加了并行调度——拓扑排序让无依赖的子任务同时跑,有依赖的自动等待。
但并行调度有一个特性:它是同步阻塞的。
parallel_tasks 调用后,父 Agent 挂起,等所有层都执行完,拿到全部结果,才回到 agent_loop 的下一轮。用户在这段时间什么也做不了。
有些任务,你确实需要等结果——"分析三个模块然后汇总"。但有些任务,你根本不需要等——"跑一下测试套件"、"在后台做个大文件分析"。
问题:不需要等的任务,也被迫等
想象一个场景:
用户: 帮我写一个 fizzbuzz.py(1到100),然后跑一下确认输出正确
Agent 的执行:
1. 写 fizzbuzz.py → 2s
2. task("跑 python fizzbuzz.py 验证") → 15s ← 阻塞等待
3. 告诉用户结果 → 1s第 2 步跑脚本要十几秒。这段时间里,Agent 和用户都干等着。
但其实 Agent 完全可以说:"代码已写好,在后台跑着验证,你可以继续问我其他事。跑完我告诉你结果。"
这就是后台任务的场景——Fire-and-Forget。
同步 vs 后台:一张图说清楚
同步 task / parallel_tasks:
用户请求 → Agent → [子任务执行中...60s...] → 拿到结果 → 回复用户
^^^^^^^^^^^^^^^^^^^^^^^^
这段时间 Agent 挂起,用户等待
后台 background_task:
用户请求 → Agent → 提交后台任务 → 立即回复"已提交,task_id: abc123"
│
└→ [子任务在后台执行中...60s...]
│
用户继续对话 → Agent 继续工作 │
│ │
用户: "跑完了吗?" → check_task("abc123") ←──┘
→ "done, 全部通过"核心区别:父 Agent 不等,继续对话。
实现
在 parallel_tasks 的基础上,增加三样东西。
1. 后台任务注册表
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor, Future
# 全局线程池:后台任务共享,避免每次创建/销毁
bg_pool = ThreadPoolExecutor(max_workers=4)
# {task_id: Future} — 注册表
bg_tasks: dict[str, Future] = {}
bg_lock = threading.Lock()三个组件:
| 组件 | 作用 |
|---|---|
bg_pool | 全局线程池,最多 4 个后台任务并发 |
bg_tasks | task_id → Future 的映射,查状态用 |
bg_lock | 线程锁,保护注册表的并发读写 |
为什么用全局线程池而不是每次 ThreadPoolExecutor?因为后台任务的生命周期不确定——提交时不知道什么时候查结果。全局池让 Future 一直活着,直到被查询或程序退出。
2. background_task 工具
BACKGROUND_TASK_TOOL = {
"name": "background_task",
"description": (
"Launch a subtask in the background. Returns immediately with a task_id. "
"The task keeps running while you continue the conversation. "
"Use for long-running work: tests, builds, large analysis. "
"Check status later with check_task."
),
"input_schema": {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": "Task description for the background subagent",
}
},
"required": ["prompt"],
},
}handler 实现:
def run_background_task(prompt: str) -> str:
"""提交后台任务,立即返回 task_id。"""
task_id = uuid.uuid4().hex[:8]
future = bg_pool.submit(run_subagent, prompt)
with bg_lock:
bg_tasks[task_id] = future
# 完成时在终端提醒用户
def on_done(fut: Future, tid: str = task_id):
status = "error" if fut.exception() else "done"
print(f"\n 📬 Background task [{tid}] {status}!")
future.add_done_callback(on_done)
return json.dumps({"task_id": task_id, "status": "submitted"})关键设计:
立即返回 — bg_pool.submit() 是非阻塞的,把任务丢进线程池就走。模型拿到 {"task_id": "abc123", "status": "submitted"},可以立即对用户说"已提交"。
完成回调 — future.add_done_callback(on_done) 注册回调,任务完成后在终端打印通知。这样即使用户没主动问,也能在终端看到"📬 后台任务完成了"。
8 位 task_id — uuid4().hex[:8] 生成短 ID,方便模型和用户引用。
3. check_task 工具
CHECK_TASK_TOOL = {
"name": "check_task",
"description": (
"Check the status of a background task. "
"Returns {status: 'running'} if still in progress, "
"or {status: 'done', result: '...'} when finished, "
"or {status: 'error', error: '...'} if it failed."
),
"input_schema": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "The task_id returned by background_task",
}
},
"required": ["task_id"],
},
}handler 实现:
def run_check_task(task_id: str) -> str:
"""查询后台任务状态。"""
with bg_lock:
future = bg_tasks.get(task_id)
if future is None:
return json.dumps({
"status": "not_found",
"error": f"No task with id '{task_id}'"
})
if not future.done():
return json.dumps({"status": "running"})
exc = future.exception()
if exc:
return json.dumps({"status": "error", "error": str(exc)})
return json.dumps({"status": "done", "result": future.result()})三种状态:
| 状态 | 含义 | 模型行为 |
|---|---|---|
running | 还在跑 | 告诉用户"还在执行",或者继续干别的 |
done | 完成,带 result | 把结果告诉用户 |
error | 失败,带错误信息 | 报告错误,决定是否重试 |
4. 注册到 dispatch map
TOOLS = BASE_TOOLS + [
TASK_TOOL,
PARALLEL_TASKS_TOOL,
BACKGROUND_TASK_TOOL, # 新增
CHECK_TASK_TOOL, # 新增
]
TOOL_HANDLERS = {
**BASE_HANDLERS,
"task": lambda **kw: run_subagent(kw["prompt"]),
"parallel_tasks": lambda **kw: run_parallel_tasks(kw["tasks"]),
"background_task": lambda **kw: run_background_task(kw["prompt"]), # 新增
"check_task": lambda **kw: run_check_task(kw["task_id"]), # 新增
}还是 dispatch 模式——加新能力只加注册,agent_loop 一行不改。
运行效果
❯ 帮我写一个 fizzbuzz.py(1到100),写完后在后台跑一下确认输出正确,
同时帮我写一个 README.md 说明用法
> write_file: fizzbuzz.py
> background_task: {"prompt": "Run python fizzbuzz.py and verify output"}
🚀 Background task [e4a91c3f] submitted
> write_file: README.md
fizzbuzz.py 和 README.md 都已写好。
测试在后台跑着 (task_id: e4a91c3f),你可以继续问我别的。
❯ 帮我看一下 pyproject.toml 里的 Python 版本要求对不对
> read_file: pyproject.toml
pyproject.toml 里写的 requires-python = ">=3.10",和代码里用的
match/case 语法一致,没问题。
📬 Background task [e4a91c3f] done!
❯ fizzbuzz 跑得怎么样?
> check_task: {"task_id": "e4a91c3f"}
运行正确。输出 100 行:Fizz 27次、Buzz 14次、FizzBuzz 6次,
其余为数字,总计 100 行,无异常。注意时间线:Agent 提交验证后立刻回复了用户,然后处理了另一个完全不相关的问题(检查 pyproject.toml)。验证在后台默默跑着,跑完后终端弹出通知,用户下一轮对话时 Agent 通过 check_task 获取结果。
优雅退出
后台任务的一个问题:用户输入 q 退出时,可能还有任务在跑。直接退出会丢失结果。
if __name__ == "__main__":
# ... 主循环 ...
# 优雅退出:等待后台任务完成
with bg_lock:
pending = {tid: f for tid, f in bg_tasks.items() if not f.done()}
if pending:
print(f"\n⏳ Waiting for {len(pending)} background task(s)...")
for tid, future in pending.items():
try:
future.result(timeout=300)
print(f" ✓ {tid} done")
except Exception as e:
print(f" ✗ {tid} error: {e}")
bg_pool.shutdown(wait=False)退出时检查注册表,有未完成的就等(最多 5 分钟),然后关闭线程池。
核心机制解析
为什么不直接用 parallel_tasks 的线程池?
parallel_tasks 的线程池是局部的——with ThreadPoolExecutor(...) as pool,出了 with 块就销毁。后台任务的 Future 需要一直活着,直到被查询。所以用全局线程池。
模型怎么知道什么时候该查结果?
两种方式:
- 用户主动问 — "跑完了吗?" → 模型调
check_task - 模型自己判断 — 模型记住了 task_id,在后续对话中认为需要这个结果时主动 check
实际测试中,模型表现不错——它会在 background_task 返回后告诉用户 task_id,并在后续轮次中主动检查。
线程安全问题
bg_tasks 字典可能被多个线程同时读写:
- 主线程:
run_background_task写入、run_check_task读取 - 后台线程:
on_done回调可能与主线程同时访问
所以需要 bg_lock。Python 的 GIL 保护了简单的字典操作,但显式加锁更安全——尤其是未来如果用多进程或异步运行时。
Future 的生命周期
submit() → Future 创建,状态: running
on_done() → Future 完成,终端通知
check_task() → 读取 Future 结果
程序退出 → bg_pool.shutdown() 清理Future 一直留在 bg_tasks 字典里,不会被垃圾回收。对于教学 demo 这不是问题。生产环境需要加 TTL 或定期清理。
task vs parallel_tasks vs background_task
现在 Agent 有三种任务委派方式,适用场景不同:
| 工具 | 阻塞? | 返回值 | 适用场景 |
|---|---|---|---|
task | 是 | 子任务结果 | 需要结果才能继续的单个子任务 |
parallel_tasks | 是 | 所有结果 | 多个子任务,需要全部结果后汇总 |
background_task | 否 | task_id | 长耗时工作,不阻塞当前对话 |
一个类比:
task= 打电话。你等对方说完才挂。parallel_tasks= 群发电话,同时打给三个人,都说完了你才挂。background_task= 发消息。发完就干别的,想看回复时再看。
提示词的配套改动
光有工具不够,提示词也得跟上。改了三处。
1. System Prompt:告诉模型有后台能力
之前(并行篇)的 system prompt 只有两种策略:
SYSTEM = f"""You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain.
When a task can be broken into subtasks, use `parallel_tasks` to run them concurrently.
Put ALL tasks (including dependent ones) in a SINGLE `parallel_tasks` call — the scheduler handles ordering.
..."""现在改成策略菜单,列出所有四种委派方式:
SYSTEM = f"""You are a coding agent at {WORKDIR}. Use tools to solve tasks. Act, don't explain.
You have several task delegation strategies:
- `task`: Run a subtask synchronously. Blocks until done, returns result.
- `parallel_tasks`: Run multiple subtasks concurrently with dependency ordering. Blocks until ALL complete.
- `background_task`: Fire-and-forget. Returns a task_id immediately. The task runs while you continue the conversation.
- `check_task`: Poll a background task by task_id. Returns running/done/error.
For background_task: launch it, tell the user the task_id, and keep working. Check later with check_task."""两个关键设计:
- 策略菜单格式 — 把四种工具放在一个列表里对比,模型能更准确地选择。之前是零散的指令,现在是结构化的参考表。
- "tell the user the task_id, and keep working" — 这句是后台任务的行为锚点。不加这句,模型提交后台任务后可能会干等结果(它习惯了同步模式),而不是继续对话。
2. Plan 阶段:标注适合后台执行的步骤
之前(并行篇)的 plan 提示词:
PLAN_PROMPT = (
"Analyze the task and outline a step-by-step plan. Do NOT execute anything yet. "
"For each step, note which other steps it depends on (if any). "
"Mark steps that can run in parallel."
)加了一句:
PLAN_PROMPT = (
"Analyze the task and outline a step-by-step plan. Do NOT execute anything yet. "
"For each step, note which other steps it depends on (if any). "
"Mark steps that can run in parallel. "
"Identify long-running steps that could run in the background."
)这样模型在规划阶段就会区分:
1. 写 fizzbuzz.py(同步,需要确认写入成功)
2. 在后台跑 python fizzbuzz.py 验证输出(后台,耗时长,不阻塞)
3. 写 README.md(同步,可以和 2 同时进行)有了这个标注,执行阶段模型自然知道第 2 步该用 background_task。
3. 执行阶段:告诉模型怎么用后台工具
之前(并行篇)的执行提示词:
EXECUTE_PROMPT = (
"Now execute it step by step. "
"Use `parallel_tasks` to run multiple subtasks concurrently with dependency ordering. "
"Only use sequential `task` calls when the next step depends on seeing the previous result."
)现在加上后台策略:
EXECUTE_PROMPT = (
"Now execute it step by step. "
"Use `parallel_tasks` to run multiple subtasks concurrently with dependency ordering. "
"Use `background_task` for long-running work that doesn't block progress. "
"Use `check_task` to retrieve background results when needed. "
"Only use sequential `task` calls when the next step depends on seeing the previous result to decide what to do."
)三层提示词各管一件事:
| 提示词 | 告诉模型什么 |
|---|---|
| System Prompt | 有哪些工具,各自的语义是什么 |
| Plan 提示词 | 规划时标注哪些步骤适合后台 |
| 执行提示词 | 执行时用哪个工具对应哪种步骤 |
为什么要在三个地方分别提?
一个常见的误区是把所有指引都塞进 system prompt。但模型在不同阶段关注的东西不同:
- System Prompt 是全局参考——"你有什么能力"
- Plan 提示词 影响规划输出——"规划时注意什么"
- 执行提示词 影响工具选择——"执行时怎么选"
在对的时机给对的信息,比一次倒完更有效。
小结
- 全局线程池 —
ThreadPoolExecutor(max_workers=4)让后台任务共享,生命周期跨越整个会话 - Future 注册表 —
{task_id: Future}存储状态,check_task按 ID 查询 - 完成回调 —
add_done_callback在终端推送通知 - 优雅退出 — 程序结束前等待未完成的后台任务
增量代码约 30 行。本质是把 run_subagent 从"同步调用"变成"提交到线程池 + 返回 Future"——底层执行逻辑完全不变,只是套了一层异步壳。
这也是 Claude Code 里 run_in_background 参数的核心思路:同一个 Agent 执行机制,同步和异步只差在"等不等 Future"。