跳到主要内容

异步编程与并发模型

从前端转 Python 后端做 Agent 开发,异步编程是绕不过去的坎。这篇文章帮你从零搞懂 asyncio 的核心概念,理解并发模型的全貌,最后落到 Agent 场景的实战用法上。

从一个真实场景说起

假设你正在开发一个 AI Agent 系统,用户提了一个问题,Agent 需要同时做三件事:

  1. 调用 LLM 生成回答
  2. 查询知识库获取相关文档
  3. 调用搜索引擎补充信息

如果用同步写法,三个请求串行执行,假设每个耗时 2 秒,总共就是 6 秒。用户等到花儿都谢了。

但如果三个请求并发执行,理论上总耗时只需要 2 秒多一点。这就是异步编程在 Agent 场景中最直观的价值。

作为前端同学,你对 Promiseasync/await 已经很熟悉了。Python 的 asyncio 在概念上非常相似,但有一些独特的设计需要理解。我们一步步来。

并发、并行、异步:先搞清楚概念

很多人把这几个词混着用,但在面试和实际工程中,它们的区别很重要。

三者对比

概念定义类比
并发 (Concurrency)一个线程交替处理多个任务一个人在炒菜和煮饭之间来回切换
并行 (Parallelism)多个线程同时执行多个任务两个人同时一个炒菜一个煮饭
异步 (Asynchronous)发起操作后不等结果,先干别的点了外卖不守着等,先去写代码

Python 中的体现

Python 并发模型全景图
================================================================

┌─────────────────────────────┐
│ Python 进程 │
│ │
│ ┌───────────┐ ┌──────────┐ │
│ │ 主线程 │ │ 子线程 │ │ <-- threading (并发)
│ │ │ │ │ │
│ │ ┌───────┐ │ │ │ │
│ │ │事件循环│ │ │ │ │ <-- asyncio (并发)
│ │ │(协程) │ │ │ │ │
│ │ └───────┘ │ │ │ │
│ └───────────┘ └──────────┘ │
│ │
│ ┌───────────┐ ┌──────────┐ │
│ │ 子进程1 │ │ 子进程2 │ │ <-- multiprocessing (并行)
│ └───────────┘ └──────────┘ │
└─────────────────────────────┘

对于 Agent 开发,asyncio 是你的主战场。 原因很简单:Agent 的瓶颈在 I/O(网络请求、API 调用),不在 CPU 计算。asyncio 恰好擅长处理 I/O 密集型任务。

asyncio 基础:async/await

如果你来自前端,async/await 对你来说几乎是零学习成本。Python 的语法几乎一模一样。

第一个异步函数

import asyncio

async def greet(name: str, delay: float) -> str:
print(f"开始处理: {name}")
await asyncio.sleep(delay) # 模拟异步 I/O 操作
print(f"完成处理: {name}")
return f"Hello, {name}!"

async def main():
# 串行执行 - 耗时 1 + 2 = 3 秒
result1 = await greet("Alice", 1)
result2 = await greet("Bob", 2)
print(result1, result2)

asyncio.run(main())

输出:

开始处理: Alice
完成处理: Alice
开始处理: Bob
完成处理: Bob
Hello, Alice! Hello, Bob!

看到了吗?两个任务是串行的。await 就是"等这个完成再往下走"的意思,和 JavaScript 一样。

并发执行:asyncio.gather

async def main():
# 并发执行 - 耗时 max(1, 2) = 2 秒
result1, result2 = await asyncio.gather(
greet("Alice", 1),
greet("Bob", 2),
)
print(result1, result2)

输出:

开始处理: Alice
开始处理: Bob
完成处理: Alice
完成处理: Bob
Hello, Alice! Hello, Bob!

gather 会同时启动所有协程,等它们全部完成后返回结果列表。这就是并发。

事件循环:asyncio 的心脏

事件循环是 asyncio 的核心。理解它的工作原理,很多诡异的 bug 你就能自己排查了。

事件循环工作原理

事件循环 (Event Loop) 工作流程
================================================================

┌──────────────────────────────────────────────────┐
│ 事件循环 │
│ │
│ ┌─────────┐ │
│ │ 就绪队列 │ <-- 协程在这里排队等待执行 │
│ └────┬────┘ │
│ │ │
│ v │
│ ┌─────────┐ ┌─────────────────┐ │
│ │ 取出一个 │────>│ 执行直到 await │ │
│ │ 协程 │ │ │ │
│ └─────────┘ └───────┬─────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ v v v │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ I/O 完成 │ │ 定时器 │ │ 网络回调 │ │
│ │ 回调 │ │ 到期 │ │ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ v v v │
│ ┌─────────────────────────────────────┐ │
│ │ 放回就绪队列,等待下次调度 │ │
│ └─────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘

关键点:

  • 事件循环是单线程的,同一时刻只执行一个协程
  • 当协程遇到 await(比如网络请求)时,事件循环把它挂起,去执行别的协程
  • I/O 完成后,回调把协程放回就绪队列
  • 这就是为什么 asyncio 是并发而不是并行 -- 它在同一个线程里来回切换

手动操作事件循环

import asyncio

async def fetch_data(url: str) -> dict:
print(f"请求 {url}")
await asyncio.sleep(1) # 模拟网络请求
return {"url": url, "status": 200}

async def main():
# gather 并发执行多个协程
results = await asyncio.gather(
fetch_data("https://api.example.com/users"),
fetch_data("https://api.example.com/orders"),
fetch_data("https://api.example.com/products"),
)
for r in results:
print(r)

# asyncio.run() 做了三件事:
# 1. 创建新的事件循环
# 2. 运行 main() 协程
# 3. 关闭事件循环
asyncio.run(main())

协程的本质

协程 vs 线程 vs 进程

特性协程线程进程
创建开销极低(几 KB)中等(几 MB)高(几十 MB)
切换开销极低中等
调度方式协作式(主动让出)抢占式(OS 调度)抢占式(OS 调度)
数据共享需要同步需要同步需要 IPC
适用场景I/O 密集I/O 密集 + 少量 CPUCPU 密集

协程是协作式的

这一点非常重要。线程的调度是操作系统强制的(抢占式),但协程的切换是你主动让出控制权的(协作式)。

async def cpu_bound():
"""这个协程不会主动让出控制权,会阻塞整个事件循环!"""
total = 0
for i in range(10**7): # CPU 密集计算
total += i
return total

async def io_bound():
"""这个协程会频繁让出控制权"""
await asyncio.sleep(0.1)
return "done"

如果 cpu_bound() 正在执行,在它完成之前,io_bound() 永远不会被执行。这跟 JavaScript 里一段同步代码阻塞主线程是一回事。

TaskGroup:现代的并发编排方式

Python 3.11 引入的 asyncio.TaskGroupgather 的升级版,更安全、更直观。

gather vs TaskGroup

import asyncio

async def risky_task(task_id: int) -> str:
if task_id == 2:
raise ValueError(f"Task {task_id} 失败了!")
await asyncio.sleep(1)
return f"Task {task_id} 完成"

async def demo_gather():
"""gather: 一个任务失败,其他任务继续运行,异常被静默吞掉"""
results = await asyncio.gather(
risky_task(1),
risky_task(2), # 这个会失败
risky_task(3),
return_exceptions=True, # 不加这个会抛异常
)
print("gather 结果:", results)
# [Task 1 完成, ValueError('Task 2 失败了!'), Task 3 完成]

async def demo_task_group():
"""TaskGroup: 一个任务失败,整个组都会被取消,更符合直觉"""
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(risky_task(1))
task2 = tg.create_task(risky_task(2)) # 失败
task3 = tg.create_task(risky_task(3))
# 到这里时,所有任务都已完成
except* ValueError as eg:
print(f"TaskGroup 捕获到 {len(eg.exceptions)} 个异常")
for exc in eg.exceptions:
print(f" - {exc}")

asyncio.run(demo_gather())
asyncio.run(demo_task_group())

选择指南:

  • 需要所有结果且希望任务互相独立 -- 用 gather
  • 需要"要么全成功,要么全部取消"的语义 -- 用 TaskGroup
  • Agent 场景中多数用 gather,因为你通常希望即使某个工具调用失败,其他结果还能用

Semaphore:并发限流

Agent 场景经常需要调用外部 API,而 API 通常有速率限制。Semaphore 就是干这个的。

import asyncio
from typing import Any

async def call_llm(prompt: str, semaphore: asyncio.Semaphore) -> str:
"""模拟 LLM API 调用,通过 Semaphore 限制并发数"""
async with semaphore:
print(f"调用 LLM: {prompt[:30]}...")
await asyncio.sleep(2) # 模拟 API 延迟
return f"回答: {prompt[:20]}..."

async def main():
# 最多同时 3 个请求
semaphore = asyncio.Semaphore(3)

prompts = [f"问题 {i}" for i in range(10)]

# 并发执行,但同时最多 3 个
results = await asyncio.gather(
*[call_llm(p, semaphore) for p in prompts]
)

for r in results:
print(r)

asyncio.run(main())

Semaphore 工作原理

Semaphore(3) 限流示意
================================================================

请求数: 1 2 3 4 5 6 7 8 9 10
| | | | | | | | | |
v v v v v v v v v v
┌──────────────────────────────┐
│ Semaphore(3) │
│ │
│ 信号量计数: │
│ 请求1: 3->2 [放行] │
│ 请求2: 2->1 [放行] │
│ 请求3: 1->0 [放行] │
│ 请求4: 0 [阻塞等待] │
│ 请求5: 0 [阻塞等待] │
│ ... │
│ 请求1完成: 0->1 [请求4放行] │
│ 请求2完成: 1->2 [请求5放行] │
│ ... │
└──────────────────────────────┘

异步上下文管理器

在 Agent 开发中,管理连接池、数据库会话等资源时,异步上下文管理器非常有用。

import asyncio
from types import TracebackType
from typing import Any, Self

class AsyncLLMClient:
"""异步 LLM 客户端,管理连接生命周期"""

def __init__(self, api_key: str, max_concurrent: int = 5):
self.api_key = api_key
self.semaphore = asyncio.Semaphore(max_concurrent)
self._session = None

async def __aenter__(self) -> Self:
print("初始化异步 HTTP 会话...")
# 在实际代码中,这里会创建 aiohttp.ClientSession
self._session = "mock_session"
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
print("关闭异步 HTTP 会话...")
self._session = None

async def generate(self, prompt: str) -> str:
async with self.semaphore:
await asyncio.sleep(1) # 模拟 API 调用
return f"回答: {prompt}"

async def main():
async with AsyncLLMClient(api_key="sk-xxx", max_concurrent=3) as client:
tasks = [client.generate(f"问题 {i}") for i in range(5)]
results = await asyncio.gather(*tasks)
for r in results:
print(r)

asyncio.run(main())

对比 JavaScript 的对应概念

# Python: async context manager
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
// JavaScript: 用 using 或手动管理
const controller = new AbortController();
try {
const response = await fetch(url, { signal: controller.signal });
const data = await response.json();
} finally {
controller.abort();
}

异步迭代器:流式处理 Agent 输出

Agent 经常需要流式处理 LLM 的输出(SSE),异步迭代器正好派上用场。

import asyncio
from collections.abc import AsyncIterator

async def stream_llm_response(prompt: str) -> AsyncIterator[str]:
"""模拟流式 LLM 输出"""
words = f"这是对'{prompt}'的流式回答".split()
for word in words:
await asyncio.sleep(0.1) # 模拟流式延迟
yield word

async def process_stream():
async for chunk in stream_llm_response("什么是 Agent"):
print(chunk, end="", flush=True)
print() # 换行

asyncio.run(process_stream())

异步生成器 vs 普通生成器

# 普通生成器 - 适合 CPU 密集或同步 I/O
def sync_generator():
for i in range(10):
yield i # 不会挂起

# 异步生成器 - 适合异步 I/O
async def async_generator():
for i in range(10):
await asyncio.sleep(0.1) # 会挂起
yield i

Agent 场景实战:并发编排

案例:并发工具执行

一个典型的 Agent 可能有多个工具需要并发调用:

import asyncio
import time
from dataclasses import dataclass
from typing import Any

@dataclass
class ToolResult:
tool_name: str
success: bool
data: Any = None
error: str | None = None

async def search_web(query: str) -> ToolResult:
"""模拟搜索引擎调用"""
await asyncio.sleep(1.5)
return ToolResult(
tool_name="search_web",
success=True,
data={"results": [f"搜索结果: {query}"]},
)

async def query_knowledge_base(query: str) -> ToolResult:
"""模拟知识库查询"""
await asyncio.sleep(1.0)
return ToolResult(
tool_name="knowledge_base",
success=True,
data={"docs": [f"文档片段: {query}"]},
)

async def call_llm_api(prompt: str) -> ToolResult:
"""模拟 LLM API 调用"""
await asyncio.sleep(2.0)
return ToolResult(
tool_name="llm_api",
success=True,
data={"response": f"LLM 回答: {prompt[:20]}..."},
)

async def run_agent_tools(query: str) -> list[ToolResult]:
"""并发执行所有工具调用"""
semaphore = asyncio.Semaphore(5) # 限制最多 5 个并发

async def limited_call(coro):
async with semaphore:
return await coro

start = time.time()

results = await asyncio.gather(
limited_call(search_web(query)),
limited_call(query_knowledge_base(query)),
limited_call(call_llm_api(query)),
)

elapsed = time.time() - start
print(f"并发执行耗时: {elapsed:.2f}s (最长单个任务: 2.0s)")
return list(results)

async def main():
results = await run_agent_tools("Python 异步编程")
for r in results:
print(f" [{r.tool_name}] {'成功' if r.success else '失败'}: {r.data}")

asyncio.run(main())

案例:多 Agent 协作

import asyncio
from dataclasses import dataclass

@dataclass
class AgentMessage:
sender: str
content: str
round_num: int

async def researcher_agent(query: str, round_num: int) -> AgentMessage:
"""研究员 Agent:负责收集信息"""
await asyncio.sleep(1.5)
return AgentMessage(
sender="researcher",
content=f"关于'{query}'的调研结果...",
round_num=round_num,
)

async def writer_agent(research: str, round_num: int) -> AgentMessage:
"""写作 Agent:负责整理成文"""
await asyncio.sleep(2.0)
return AgentMessage(
sender="writer",
content=f"基于调研整理的文章: {research[:30]}...",
round_num=round_num,
)

async def reviewer_agent(draft: str, round_num: int) -> AgentMessage:
"""审核 Agent:负责质量检查"""
await asyncio.sleep(1.0)
return AgentMessage(
sender="reviewer",
content=f"审核意见: 内容质量良好,建议补充数据",
round_num=round_num,
)

async def run_multi_agent_pipeline(query: str) -> list[AgentMessage]:
"""多 Agent 流水线:researcher -> writer -> reviewer"""
messages = []

# 第一轮:并发调研
print("[Round 1] 并发调研...")
research_tasks = [
researcher_agent(query, 1) for _ in range(3)
]
research_results = await asyncio.gather(*research_tasks)
messages.extend(research_results)

# 第二轮:基于调研结果并发写作
print("[Round 2] 并发写作...")
combined_research = " ".join(r.content for r in research_results)
writing_tasks = [
writer_agent(combined_research, 2) for _ in range(2)
]
writing_results = await asyncio.gather(*writing_tasks)
messages.extend(writing_results)

# 第三轮:并发审核
print("[Round 3] 并发审核...")
all_drafts = " ".join(w.content for w in writing_results)
review_tasks = [
reviewer_agent(all_drafts, 3)
]
review_results = await asyncio.gather(*review_tasks)
messages.extend(review_results)

return messages

async def main():
messages = await run_multi_agent_pipeline("AI Agent 架构设计")
for msg in messages:
print(f" [Round {msg.round_num}] {msg.sender}: {msg.content[:40]}...")

asyncio.run(main())

案例:带超时和重试的 LLM 调用

import asyncio
from typing import Any

async def call_llm_with_retry(
prompt: str,
max_retries: int = 3,
timeout: float = 10.0,
backoff_factor: float = 2.0,
) -> str:
"""带指数退避重试的 LLM 调用"""
last_error = None

for attempt in range(max_retries):
try:
result = await asyncio.wait_for(
_actual_llm_call(prompt),
timeout=timeout,
)
return result
except asyncio.TimeoutError:
last_error = f"超时 (attempt {attempt + 1})"
wait_time = backoff_factor ** attempt
print(f" 调用超时,{wait_time}s 后重试...")
except Exception as e:
last_error = str(e)
wait_time = backoff_factor ** attempt
print(f" 调用失败: {e}{wait_time}s 后重试...")

await asyncio.sleep(wait_time)

raise RuntimeError(f"LLM 调用失败,已重试 {max_retries} 次: {last_error}")

async def _actual_llm_call(prompt: str) -> str:
"""模拟实际的 LLM 调用"""
import random
await asyncio.sleep(random.uniform(0.5, 3.0))
if random.random() < 0.3:
raise ConnectionError("API 连接失败")
return f"LLM 回答: {prompt[:30]}..."

async def main():
# 并发调用多个 LLM,每个都有独立的重试和超时
tasks = [
call_llm_with_retry(f"问题 {i}")
for i in range(5)
]

results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f" 问题 {i}: 失败 - {result}")
else:
print(f" 问题 {i}: {result}")

asyncio.run(main())

aiohttp:异步 HTTP 请求

在 Agent 开发中,aiohttp 是最常用的异步 HTTP 客户端。这里给出一个完整的对比。

同步 vs 异步 HTTP 性能对比

import asyncio
import time

import aiohttp

# 同步版本(需要 requests 库)
def sync_fetch_all(urls: list[str]) -> list[dict]:
"""同步方式:串行请求"""
import requests
results = []
for url in urls:
resp = requests.get(url, timeout=5)
results.append({"url": url, "status": resp.status_code})
return results

# 异步版本
async def async_fetch_all(urls: list[str]) -> list[dict]:
"""异步方式:并发请求"""
async with aiohttp.ClientSession() as session:
async def fetch_one(url: str) -> dict:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return {"url": url, "status": resp.status}

tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)

# 带连接池限制的版本
async def async_fetch_limited(urls: list[str], max_concurrent: int = 10) -> list[dict]:
"""控制并发数的异步请求"""
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(limit=max_concurrent)

async with aiohttp.ClientSession(connector=connector) as session:
async def fetch_one(url: str) -> dict:
async with semaphore:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return {"url": url, "status": resp.status}

return await asyncio.gather(*[fetch_one(u) for u in urls])

性能基准对比

性能对比: 100 个 HTTP 请求
================================================================

同步串行 (requests):
████████████████████████████████████████████████ 50.0s

异步并发 (aiohttp, 无限制):
███ 3.2s

异步并发 (aiohttp, 限制 10 并发):
████████ 8.1s

异步并发 (aiohttp, 限制 20 并发):
██████ 5.3s

(以上数据为示例值,实际取决于网络和目标服务器)

要点:

  • 异步比同步快 10-50 倍(I/O 密集场景)
  • 无限并发可能导致被封 IP 或 OOM,需要合理设置限制
  • TCPConnector 控制连接池大小,Semaphore 控制逻辑并发数

常见陷阱

做 Agent 开发时,这些坑你大概率会踩到。提前了解,省得调试到怀疑人生。

陷阱 1:在异步代码中调用同步阻塞函数

这是新手最常犯的错误。

# 错误示范 - 会阻塞整个事件循环!
import asyncio
import time

async def bad_example():
time.sleep(3) # 这是同步阻塞,整个事件循环会卡住
print("这行要等 3 秒才执行")

async def good_example():
await asyncio.sleep(3) # 正确:用异步版本
print("这行会并发执行")

# 如果必须调用同步代码,用 run_in_executor
async def also_ok():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, time.sleep, 3) # 放到线程池执行
print("通过线程池执行同步代码")

经验法则: 任何涉及 I/O 的操作(文件读写、网络请求、数据库查询),都要用对应的异步版本。

陷阱 2:忘记 await

async def get_user():
return {"name": "Alice"}

async def main():
# 错误!返回的是协程对象,不是结果
user = get_user()
print(user) # <coroutine object get_user at 0x...>

# 正确
user = await get_user()
print(user) # {'name': 'Alice'}

Python 不会报错,只是给你一个协程对象。这种 bug 很隐蔽,特别是嵌套较深时。

陷阱 3:在事件循环内创建新事件循环

import asyncio

async def main():
# 错误!已经在一个事件循环里了,不能再创建
# asyncio.run(another_coroutine()) # RuntimeError

# 正确:直接 await
result = await another_coroutine()

async def another_coroutine():
return 42

asyncio.run(main())

陷阱 4:异步上下文管理器中忘记使用 async with

import asyncio
from typing import Any

class MyResource:
async def __aenter__(self):
print("获取资源")
return self

async def __aexit__(self, *args):
print("释放资源")

async def main():
# 错误!普通 with 不会调用 __aenter__
# with MyResource(): ...

# 正确
async with MyResource() as res:
print("使用资源")

陷阱 5:gather 中异常处理不当

import asyncio

async def might_fail():
raise ValueError("boom")

async def main():
# 危险!第一个异常会直接抛出,但其他任务还在后台运行
try:
await asyncio.gather(
might_fail(),
might_fail(),
might_fail(),
)
except ValueError:
print("捕获到异常,但其他任务还在跑...")

# 安全做法:使用 return_exceptions=True
results = await asyncio.gather(
might_fail(),
might_fail(),
might_fail(),
return_exceptions=True,
)
errors = [r for r in results if isinstance(r, Exception)]
print(f"收集到 {len(errors)} 个异常")

# 更安全:使用 TaskGroup(Python 3.11+)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(might_fail())
tg.create_task(might_fail())
except* ValueError as eg:
print(f"TaskGroup 捕获到异常: {len(eg.exceptions)} 个")

asyncio.run(main())

async/await 与 JavaScript 的对比

作为前端同学,对比学习效率更高:

特性JavaScriptPython asyncio
基本语法async function / awaitasync def / await
并发执行Promise.all([...])asyncio.gather(...)
错误处理try/catchtry/except + except*
事件循环V8 引擎内置需要手动 asyncio.run()
取消机制AbortControllertask.cancel()
超时控制Promise.race + setTimeoutasyncio.wait_for(coro, timeout)
异步迭代for await...ofasync for...in
异步上下文无原生支持(TS 5.2+ usingasync with

小结

对 Agent 开发来说,asyncio 是你的基础设施。核心要点:

  1. async/await 是协作式并发,适合 I/O 密集场景
  2. gather 用于并发执行多个任务,TaskGroup 提供更安全的异常处理
  3. Semaphore 控制并发度,避免 API 限流和资源耗尽
  4. async context manager 管理连接等资源的生命周期
  5. 永远不要在异步代码中调用同步阻塞函数(除非用 run_in_executor)
  6. aiohttp 是 Agent 的 HTTP 基础设施,配合连接池和超时使用

掌握这些,你就能在 Agent 开发中写出高效、可靠的并发代码了。

参考资料