第8课:后台执行——异步任务与通知队列
慢操作丢后台,Agent继续想下一步。用线程安全的通知队列让Agent不再干等npm install跑完。
系列导读
这是**《12课拆解Claude Code架构》系列的第 8 课,也是阶段三(执行优化)的收官课**。
前七课我们造了一个功能完整的 Agent 系统:第 1 课建了 Agent Loop,第 2 课加了工具注册,第 3 课引入规划,第 4 课用 Subagent 隔离上下文,第 5 课加载技能,第 6 课压缩上下文,第 7 课实现任务持久化。
但有一个问题始终悬而未决——慢操作阻塞循环。
第 8 课的格言:
"慢操作丢后台,Agent 继续想下一步"
这一课,我们给 Agent 装上 BackgroundManager,让耗时命令在守护线程里跑,主循环不必干等。
阻塞等待:被绑住手脚的 Agent
用户说了一句很自然的话:"装一下依赖,顺便帮我建个 .env 配置文件"
两个完全独立的任务,理想状态应该并行。但阻塞式循环下:
用户: "装依赖顺便建个配置文件"
↓
模型思考: 先装依赖
↓
工具调用: bash → npm install ← 阻塞 120 秒
↓
(模型干等…… 什么也做不了)
↓
工具结果: added 847 packages
↓
模型思考: 现在建配置文件
↓
工具调用: bash → echo "DB_HOST=..." > .env
↓
完成。总耗时: 122 秒120 秒干等。这不是个例,开发中大量操作都是慢的:
| 操作 | 典型耗时 |
|---|---|
npm install | 30-180 秒 |
pytest 全量测试 | 10-300 秒 |
docker build | 60-600 秒 |
pip install 大包 | 20-120 秒 |
每一次慢操作,都是 Agent 被迫空转的时间。

核心架构:主线程 + 守护线程
思路:慢操作扔后台线程,主循环继续推进。子进程完成后结果进队列,下一轮排空。
主线程(Agent Loop) 后台线程(守护)
───────────────────── ─────────────────
│ LLM 调用 │
│ ↓ │
│ 工具: run_in_background │
│ ↓ │
│ run() → 启动守护线程 ──────────→ │ _execute()
│ ↓ 立即返回 task_id │ ↓
│ │ subprocess.run("npm install")
│ 工具: bash → 建 .env │ (跑了 120 秒)
│ ↓ │ ↓
│ LLM 调用 │ 结果 → notification_queue
│ ↓ │
│ drain_notifications() ←──────────┘
│ ↓
│ 注入 <background-results> 标签
│ ↓
│ LLM 看到后台任务完成关键:循环保持单线程,只有子进程 I/O 被并行化。 线程间通信只通过一个队列。
四步拆解:从阻塞到非阻塞
第一步:BackgroundManager 数据结构
import threading
import queue
import subprocess
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class BackgroundTask:
task_id: str
command: str
status: str = "running" # running | completed | failed
result: Optional[str] = None
class BackgroundManager:
def __init__(self):
self.tasks: dict[str, BackgroundTask] = {}
self.notification_queue: queue.Queue = queue.Queue()
self.lock = threading.Lock()三个核心字段:tasks 字典存任务状态,notification_queue 是线程安全的通知队列,lock 保护字典并发读写。
为什么用 queue.Queue?因为它自带线程安全,生产者(后台线程)和消费者(主线程)之间不需要手动加锁。
第二步:run() 启动后台任务
def run(self, command: str) -> str:
task_id = f"bg_{len(self.tasks)}_{int(time.time())}"
task = BackgroundTask(task_id=task_id, command=command)
with self.lock:
self.tasks[task_id] = task
thread = threading.Thread(
target=self._execute,
args=(task_id, command),
daemon=True,
)
thread.start()
return task_id # 立即返回,不等子进程关键点:daemon=True 保证主进程退出时自动清理;立即返回 task_id,调用方不阻塞。
第三步:_execute() 在后台跑子进程
def _execute(self, task_id: str, command: str):
try:
r = subprocess.run(
command, shell=True,
capture_output=True, text=True,
timeout=600,
)
output = (r.stdout + r.stderr).strip()
result = output[:50000] if output else "(no output)"
status = "completed"
except subprocess.TimeoutExpired:
result = "Error: Background task timed out (600s)"
status = "failed"
except Exception as e:
result = f"Error: {e}"
status = "failed"
# 更新任务状态
with self.lock:
self.tasks[task_id].status = status
self.tasks[task_id].result = result
# 通知主线程
self.notification_queue.put({
"task_id": task_id,
"command": command,
"status": status,
"result": result,
})跑在守护线程里,做两件事:跑子进程(subprocess.run() 只阻塞后台线程,不影响主线程),然后把结果推进 notification_queue。超时设为 600 秒——后台跑,给充足时间。
第四步:drain_notifications() 排空队列
def drain_notifications(self) -> list[dict]:
notifications = []
while True:
try:
n = self.notification_queue.get_nowait()
notifications.append(n)
except queue.Empty:
break
return notificationsget_nowait() 是非阻塞的:有消息就取,没消息立刻返回 Empty。一次性取完所有已完成的后台任务,绝不阻塞主循环。
集成到 Agent Loop
改动极小——只在每次 LLM 调用前加一步排空通知:
bg_manager = BackgroundManager()
def agent_loop(messages: list):
while True:
# ——— 新增:排空后台通知 ———
notifications = bg_manager.drain_notifications()
if notifications:
bg_results = format_background_results(notifications)
messages.append({"role": "user", "content": bg_results})
# 原有逻辑不变
response = client.messages.create(
model=MODEL, system=SYSTEM,
messages=messages, tools=TOOLS, max_tokens=8000,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason != "tool_use":
return
results = []
for block in response.content:
if block.type == "tool_use":
output = dispatch_tool(block.name, block.input)
results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
messages.append({"role": "user", "content": results})通知以 <background-results> 标签注入,让模型明确区分这是异步事件,不是用户输入或工具结果:
def format_background_results(notifications: list[dict]) -> str:
parts = ["<background-results>"]
for n in notifications:
parts.append(f"[{n['task_id']}] {n['command']}\n"
f"Status: {n['status']}\nResult: {n['result'][:2000]}")
parts.append("</background-results>")
return "\n".join(parts)工具定义:run_in_background
{"name": "run_in_background",
"description": "Run a shell command in the background. Returns immediately "
"with a task ID. Use for long-running commands like npm install, "
"pytest, docker build. Results delivered as background notifications.",
"input_schema": {"type": "object",
"properties": {"command": {"type": "string"}},
"required": ["command"]}}模型看到描述后,自己判断快命令走 bash、慢命令走 run_in_background。不需要硬编码规则。
实际运行:并行执行的效果
同样的指令,装了 BackgroundManager 之后:
模型: npm install 很慢 → run_in_background(立即返回 task_id)
模型: .env 很快 → bash → echo "DB_HOST=..." > .env → 完成
模型: ".env 已创建。npm install 在后台运行中。"
…… 继续做其他事 ……
下一轮: drain_notifications() → [bg_0] npm install completed
模型: "依赖也安装完了,added 847 packages。"总感知耗时:2 秒。 npm install 的 120 秒完全在后台消化。

三个设计洞见
洞见一:为什么循环保持单线程
为什么不把整个循环改成异步?因为 messages 是严格有序的对话历史,多线程追加会打乱顺序。单线程循环保证上下文确定,不需要对 messages 加锁。
唯一被并行化的是子进程 I/O 等待——子进程跑在独立进程空间,通过 Queue 通信,不共享可变状态。这是最安全的并发粒度。
洞见二:通知注入的时机
为什么在每次 LLM 调用前排空,而不是任务一完成就注入?因为 LLM 是批处理的——你不能在模型 "正在思考" 时插入新消息。唯一合理的注入点是:上一轮工具执行完了,下一轮 LLM 调用还没开始。
这意味着结果可能延迟一轮。完全可以接受——模型会自然地报告 "后台任务完成了"。
洞见三:daemon=True 的意义
守护线程在主进程退出时自动终止。用户按 Ctrl+C,所有后台任务自动清理——没有僵尸进程、没有资源泄漏、不需要清理逻辑。
五分钟跑起来
# 进入项目目录
cd learn-claude-code
# 启动第八课
python agents/s08_background_tasks.py任务一:后台执行 + 前台并行工作
把耗时命令丢后台,同时在前台继续做事:
s08 >> Run "sleep 5 && echo done" in the background, then create a file while it runs实际执行记录:
> background_run:
Background task 223025f0 started: sleep 5 && echo done
> write_file: Wrote 65 bytes ← sleep 还在跑,Agent 已经在写文件了
> check_background:
[running] sleep 5 && echo done ← 轮询发现还在跑
> bash: (no output) ← 继续做其他事
→ 后台任务完成,输出: done关键观察: Agent 不等 sleep 5 跑完就去创建文件了。background_run 把命令扔进守护线程,主循环继续转。
任务二:多个后台任务并发
同时启动 3 个不同耗时的后台任务:
s08 >> Start 3 background tasks: "sleep 2", "sleep 4", "sleep 6". Check their status.实际执行记录:
> background_run: Background task fa70445d started: sleep 2
> background_run: Background task ec97d9ed started: sleep 4
> background_run: Background task eafcece7 started: sleep 6
> check_background: ← 第一次检查
fa70445d: [completed] sleep 2 ← 2秒的已完成
ec97d9ed: [running] sleep 4 ← 4秒的还在跑
eafcece7: [running] sleep 6 ← 6秒的还在跑
> check_background: ec97d9ed [completed] ← 4秒的完成了
> check_background: eafcece7 [completed] ← 6秒的也完成了Agent 自动轮询状态,等所有任务完成后汇总:
| Task ID | Command | Duration | Status |
|------------|-----------|----------|--------------|
| fa70445d | sleep 2 | 2s | ✅ Completed |
| ec97d9ed | sleep 4 | 4s | ✅ Completed |
| eafcece7 | sleep 6 | 6s | ✅ Completed |总 sleep 时间 12 秒,实际只用了 ~6 秒——三个任务真正并发执行。
任务三:后台跑测试,前台继续工作
最实用的场景——测试跑着,Agent 不闲着:
s08 >> Run pytest in the background and keep working on other things实际执行记录:
> background_run:
Background task da186af7 started: pytest 2>&1Agent 立即回复"Pytest is running in the background. I'm free to keep working!"并主动询问下一步要做什么。测试在后台跑,前台随时可以接新指令——这就是后台执行的核心价值。
总结:你刚造了什么
| 组件 | 之前(s07) | 之后(s08) |
|---|---|---|
| 命令执行 | 仅阻塞式 | 阻塞 + 后台线程 |
| 后台通知 | 无 | 每轮排空的通知队列 |
| 并发模型 | 无 | 守护线程(子进程 I/O) |
| 循环改动 | — | 仅增加 drain + 注入 |
| 新增代码 | — | ~60 行(BackgroundManager) |
核心原则:最小化并发。 循环保持单线程,只把 I/O 等待扔后台。一个 Queue,没有共享可变状态——最小复杂度增量,换最大效率提升。

下一课预告
第 8 课让一个 Agent 能并行处理任务。但如果需要多个 Agent 协作——一个写代码、一个跑测试、一个审查代码?
第 9 课:Agent Teams —— 从一个 Agent 到一个团队。
# 预告:s09 的团队协作
team = TeamManager()
team.add_agent("coder", system="你是一个代码编写专家")
team.add_agent("tester", system="你是一个测试工程师")
team.add_agent("reviewer", system="你是一个代码审查专家")
team.run("实现一个用户注册功能,包含测试和代码审查")一个 Agent 做完,下一个接棒。BackgroundManager 在团队场景下继续复用。
这是《12课拆解Claude Code架构:从零掌握Agent Harness工程》系列的第 8 课。关注Claw开发者,不错过后续更新。
完整代码和交互式学习平台:github.com/shareAI-lab/learn-claude-code
如果这篇文章对你有帮助,欢迎转发给你的技术团队。
系列目录
- 第1课:用20行Python造出你的第一个AI Agent
- 第2课:给Agent加工具 —— dispatch map模式详解
- 第3课:TodoWrite —— 让Agent先想后做:规划系统
- 第4课:Subagent —— 拆解大任务,上下文隔离
- 第5课:按需加载领域知识——Skill机制
- 第6课:无限对话——上下文压缩三层策略
- 第7课:任务持久化——文件级DAG任务图
- 第8课:后台执行——异步任务与通知队列(本文)
- 第9课:Agent Teams——多Agent协作:团队与邮箱系统
- 第10课:团队协议——状态机驱动的协商
- 第11课:自治Agent——自组织任务认领
- 第12课:终极隔离——Worktree并行执行