HuanCode Docs

第8课:后台执行——异步任务与通知队列

慢操作丢后台,Agent继续想下一步。用线程安全的通知队列让Agent不再干等npm install跑完。

系列导读

这是**《12课拆解Claude Code架构》系列的第 8 课,也是阶段三(执行优化)的收官课**。

前七课我们造了一个功能完整的 Agent 系统:第 1 课建了 Agent Loop,第 2 课加了工具注册,第 3 课引入规划,第 4 课用 Subagent 隔离上下文,第 5 课加载技能,第 6 课压缩上下文,第 7 课实现任务持久化。

但有一个问题始终悬而未决——慢操作阻塞循环。

第 8 课的格言:

"慢操作丢后台,Agent 继续想下一步"

这一课,我们给 Agent 装上 BackgroundManager,让耗时命令在守护线程里跑,主循环不必干等。


阻塞等待:被绑住手脚的 Agent

用户说了一句很自然的话:"装一下依赖,顺便帮我建个 .env 配置文件"

两个完全独立的任务,理想状态应该并行。但阻塞式循环下:

用户: "装依赖顺便建个配置文件"

模型思考: 先装依赖

工具调用: bash → npm install         ← 阻塞 120 秒

(模型干等…… 什么也做不了)

工具结果: added 847 packages

模型思考: 现在建配置文件

工具调用: bash → echo "DB_HOST=..." > .env

完成。总耗时: 122 秒

120 秒干等。这不是个例,开发中大量操作都是慢的:

操作典型耗时
npm install30-180 秒
pytest 全量测试10-300 秒
docker build60-600 秒
pip install 大包20-120 秒

每一次慢操作,都是 Agent 被迫空转的时间。

阻塞等待:Agent被慢操作绑住

核心架构:主线程 + 守护线程

思路:慢操作扔后台线程,主循环继续推进。子进程完成后结果进队列,下一轮排空。

主线程(Agent Loop)               后台线程(守护)
─────────────────────              ─────────────────
│ LLM 调用                         │
│   ↓                              │
│ 工具: run_in_background          │
│   ↓                              │
│ run() → 启动守护线程 ──────────→ │ _execute()
│   ↓ 立即返回 task_id             │   ↓
│                                  │ subprocess.run("npm install")
│ 工具: bash → 建 .env             │   (跑了 120 秒)
│   ↓                              │   ↓
│ LLM 调用                         │ 结果 → notification_queue
│   ↓                              │
│ drain_notifications() ←──────────┘
│   ↓
│ 注入 <background-results> 标签
│   ↓
│ LLM 看到后台任务完成

关键:循环保持单线程,只有子进程 I/O 被并行化。 线程间通信只通过一个队列。

四步拆解:从阻塞到非阻塞

第一步:BackgroundManager 数据结构

import threading
import queue
import subprocess
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class BackgroundTask:
    task_id: str
    command: str
    status: str = "running"        # running | completed | failed
    result: Optional[str] = None

class BackgroundManager:
    def __init__(self):
        self.tasks: dict[str, BackgroundTask] = {}
        self.notification_queue: queue.Queue = queue.Queue()
        self.lock = threading.Lock()

三个核心字段:tasks 字典存任务状态,notification_queue 是线程安全的通知队列,lock 保护字典并发读写。

为什么用 queue.Queue?因为它自带线程安全,生产者(后台线程)和消费者(主线程)之间不需要手动加锁。

第二步:run() 启动后台任务

def run(self, command: str) -> str:
    task_id = f"bg_{len(self.tasks)}_{int(time.time())}"

    task = BackgroundTask(task_id=task_id, command=command)
    with self.lock:
        self.tasks[task_id] = task

    thread = threading.Thread(
        target=self._execute,
        args=(task_id, command),
        daemon=True,
    )
    thread.start()

    return task_id  # 立即返回,不等子进程

关键点:daemon=True 保证主进程退出时自动清理;立即返回 task_id,调用方不阻塞。

第三步:_execute() 在后台跑子进程

def _execute(self, task_id: str, command: str):
    try:
        r = subprocess.run(
            command, shell=True,
            capture_output=True, text=True,
            timeout=600,
        )
        output = (r.stdout + r.stderr).strip()
        result = output[:50000] if output else "(no output)"
        status = "completed"
    except subprocess.TimeoutExpired:
        result = "Error: Background task timed out (600s)"
        status = "failed"
    except Exception as e:
        result = f"Error: {e}"
        status = "failed"

    # 更新任务状态
    with self.lock:
        self.tasks[task_id].status = status
        self.tasks[task_id].result = result

    # 通知主线程
    self.notification_queue.put({
        "task_id": task_id,
        "command": command,
        "status": status,
        "result": result,
    })

跑在守护线程里,做两件事:跑子进程(subprocess.run() 只阻塞后台线程,不影响主线程),然后把结果推进 notification_queue。超时设为 600 秒——后台跑,给充足时间。

第四步:drain_notifications() 排空队列

def drain_notifications(self) -> list[dict]:
    notifications = []
    while True:
        try:
            n = self.notification_queue.get_nowait()
            notifications.append(n)
        except queue.Empty:
            break
    return notifications

get_nowait() 是非阻塞的:有消息就取,没消息立刻返回 Empty。一次性取完所有已完成的后台任务,绝不阻塞主循环。

集成到 Agent Loop

改动极小——只在每次 LLM 调用前加一步排空通知:

bg_manager = BackgroundManager()

def agent_loop(messages: list):
    while True:
        # ——— 新增:排空后台通知 ———
        notifications = bg_manager.drain_notifications()
        if notifications:
            bg_results = format_background_results(notifications)
            messages.append({"role": "user", "content": bg_results})

        # 原有逻辑不变
        response = client.messages.create(
            model=MODEL, system=SYSTEM,
            messages=messages, tools=TOOLS, max_tokens=8000,
        )
        messages.append({"role": "assistant", "content": response.content})
        if response.stop_reason != "tool_use":
            return

        results = []
        for block in response.content:
            if block.type == "tool_use":
                output = dispatch_tool(block.name, block.input)
                results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": output,
                })
        messages.append({"role": "user", "content": results})

通知以 <background-results> 标签注入,让模型明确区分这是异步事件,不是用户输入或工具结果:

def format_background_results(notifications: list[dict]) -> str:
    parts = ["<background-results>"]
    for n in notifications:
        parts.append(f"[{n['task_id']}] {n['command']}\n"
                     f"Status: {n['status']}\nResult: {n['result'][:2000]}")
    parts.append("</background-results>")
    return "\n".join(parts)

工具定义:run_in_background

{"name": "run_in_background",
 "description": "Run a shell command in the background. Returns immediately "
                "with a task ID. Use for long-running commands like npm install, "
                "pytest, docker build. Results delivered as background notifications.",
 "input_schema": {"type": "object",
                  "properties": {"command": {"type": "string"}},
                  "required": ["command"]}}

模型看到描述后,自己判断快命令走 bash、慢命令走 run_in_background不需要硬编码规则。

实际运行:并行执行的效果

同样的指令,装了 BackgroundManager 之后:

模型: npm install 很慢 → run_in_background(立即返回 task_id)
模型: .env 很快 → bash → echo "DB_HOST=..." > .env → 完成
模型: ".env 已创建。npm install 在后台运行中。"
      …… 继续做其他事 ……
下一轮: drain_notifications() → [bg_0] npm install completed
模型: "依赖也安装完了,added 847 packages。"

总感知耗时:2 秒。 npm install 的 120 秒完全在后台消化。

后台执行:Agent不再干等

三个设计洞见

洞见一:为什么循环保持单线程

为什么不把整个循环改成异步?因为 messages 是严格有序的对话历史,多线程追加会打乱顺序。单线程循环保证上下文确定,不需要对 messages 加锁。

唯一被并行化的是子进程 I/O 等待——子进程跑在独立进程空间,通过 Queue 通信,不共享可变状态。这是最安全的并发粒度。

洞见二:通知注入的时机

为什么在每次 LLM 调用前排空,而不是任务一完成就注入?因为 LLM 是批处理的——你不能在模型 "正在思考" 时插入新消息。唯一合理的注入点是:上一轮工具执行完了,下一轮 LLM 调用还没开始。

这意味着结果可能延迟一轮。完全可以接受——模型会自然地报告 "后台任务完成了"。

洞见三:daemon=True 的意义

守护线程在主进程退出时自动终止。用户按 Ctrl+C,所有后台任务自动清理——没有僵尸进程、没有资源泄漏、不需要清理逻辑。

五分钟跑起来

# 进入项目目录
cd learn-claude-code

# 启动第八课
python agents/s08_background_tasks.py

任务一:后台执行 + 前台并行工作

把耗时命令丢后台,同时在前台继续做事:

s08 >> Run "sleep 5 && echo done" in the background, then create a file while it runs

实际执行记录:

> background_run:
  Background task 223025f0 started: sleep 5 && echo done

> write_file: Wrote 65 bytes          ← sleep 还在跑,Agent 已经在写文件了

> check_background:
  [running] sleep 5 && echo done      ← 轮询发现还在跑

> bash: (no output)                   ← 继续做其他事

  → 后台任务完成,输出: done

关键观察: Agent 不等 sleep 5 跑完就去创建文件了。background_run 把命令扔进守护线程,主循环继续转。

任务二:多个后台任务并发

同时启动 3 个不同耗时的后台任务:

s08 >> Start 3 background tasks: "sleep 2", "sleep 4", "sleep 6". Check their status.

实际执行记录:

> background_run: Background task fa70445d started: sleep 2
> background_run: Background task ec97d9ed started: sleep 4
> background_run: Background task eafcece7 started: sleep 6

> check_background:                   ← 第一次检查
  fa70445d: [completed] sleep 2       ← 2秒的已完成
  ec97d9ed: [running] sleep 4         ← 4秒的还在跑
  eafcece7: [running] sleep 6         ← 6秒的还在跑

> check_background: ec97d9ed [completed]  ← 4秒的完成了
> check_background: eafcece7 [completed]  ← 6秒的也完成了

Agent 自动轮询状态,等所有任务完成后汇总:

| Task ID    | Command   | Duration | Status       |
|------------|-----------|----------|--------------|
| fa70445d   | sleep 2   | 2s       | ✅ Completed |
| ec97d9ed   | sleep 4   | 4s       | ✅ Completed |
| eafcece7   | sleep 6   | 6s       | ✅ Completed |

总 sleep 时间 12 秒,实际只用了 ~6 秒——三个任务真正并发执行。

任务三:后台跑测试,前台继续工作

最实用的场景——测试跑着,Agent 不闲着:

s08 >> Run pytest in the background and keep working on other things

实际执行记录:

> background_run:
  Background task da186af7 started: pytest 2>&1

Agent 立即回复"Pytest is running in the background. I'm free to keep working!"并主动询问下一步要做什么。测试在后台跑,前台随时可以接新指令——这就是后台执行的核心价值。

总结:你刚造了什么

组件之前(s07)之后(s08)
命令执行仅阻塞式阻塞 + 后台线程
后台通知每轮排空的通知队列
并发模型守护线程(子进程 I/O)
循环改动仅增加 drain + 注入
新增代码~60 行(BackgroundManager)

核心原则:最小化并发。 循环保持单线程,只把 I/O 等待扔后台。一个 Queue,没有共享可变状态——最小复杂度增量,换最大效率提升。

变更总结:从阻塞到后台执行

下一课预告

第 8 课让一个 Agent 能并行处理任务。但如果需要多个 Agent 协作——一个写代码、一个跑测试、一个审查代码?

第 9 课:Agent Teams —— 从一个 Agent 到一个团队。

# 预告:s09 的团队协作
team = TeamManager()
team.add_agent("coder", system="你是一个代码编写专家")
team.add_agent("tester", system="你是一个测试工程师")
team.add_agent("reviewer", system="你是一个代码审查专家")

team.run("实现一个用户注册功能,包含测试和代码审查")

一个 Agent 做完,下一个接棒。BackgroundManager 在团队场景下继续复用。


这是《12课拆解Claude Code架构:从零掌握Agent Harness工程》系列的第 8 课。关注Claw开发者,不错过后续更新。

完整代码和交互式学习平台:github.com/shareAI-lab/learn-claude-code

如果这篇文章对你有帮助,欢迎转发给你的技术团队。

系列目录

  • 第1课:用20行Python造出你的第一个AI Agent
  • 第2课:给Agent加工具 —— dispatch map模式详解
  • 第3课:TodoWrite —— 让Agent先想后做:规划系统
  • 第4课:Subagent —— 拆解大任务,上下文隔离
  • 第5课:按需加载领域知识——Skill机制
  • 第6课:无限对话——上下文压缩三层策略
  • 第7课:任务持久化——文件级DAG任务图
  • 第8课:后台执行——异步任务与通知队列(本文)
  • 第9课:Agent Teams——多Agent协作:团队与邮箱系统
  • 第10课:团队协议——状态机驱动的协商
  • 第11课:自治Agent——自组织任务认领
  • 第12课:终极隔离——Worktree并行执行

On this page