异步编程与并发模型
从前端转 Python 后端做 Agent 开发,异步编程是绕不过去的坎。这篇文章帮你从零搞懂 asyncio 的核心概念,理解并发模型的全貌,最后落到 Agent 场景的实战用法上。
从一个真实场景说起
假设你正在开发一个 AI Agent 系统,用户提了一个问题,Agent 需要同时做三件事:
- 调用 LLM 生成回答
- 查询知识库获取相关文档
- 调用搜索引擎补充信息
如果用同步写法,三个请求串行执行,假设每个耗时 2 秒,总共就是 6 秒。用户等到花儿都谢了。
但如果三个请求并发执行,理论上总耗时只需要 2 秒多一点。这就是异步编程在 Agent 场景中最直观的价值。
作为前端同学,你对 Promise、async/await 已经很熟悉了。Python 的 asyncio 在概念上非常相似,但有一些独特的设计需要理解。我们一步步来。
并发、并行、异步:先搞清楚概念
很多人把这几个词混着用,但在面试和实际工程中,它们的区别很重要。
三者对比
| 概念 | 定义 | 类比 |
|---|---|---|
| 并发 (Concurrency) | 一个线程交替处理多个任务 | 一个人在炒菜和煮饭之间来回切换 |
| 并行 (Parallelism) | 多个线程同时执行多个任务 | 两个人同时一个炒菜一个煮饭 |
| 异步 (Asynchronous) | 发起操作后不等结果,先干别的 | 点了外卖不守着等,先去写代码 |
Python 中的体现
Python 并发模型全景图
================================================================
┌─────────────────────────────┐
│ Python 进程 │
│ │
│ ┌───────────┐ ┌──────────┐ │
│ │ 主线程 │ │ 子线程 │ │ <-- threading (并发)
│ │ │ │ │ │
│ │ ┌───────┐ │ │ │ │
│ │ │事件循环│ │ │ │ │ <-- asyncio (并发)
│ │ │(协程) │ │ │ │ │
│ │ └───────┘ │ │ │ │
│ └───────────┘ └──────────┘ │
│ │
│ ┌───────────┐ ┌──────────┐ │
│ │ 子进程1 │ │ 子进程2 │ │ <-- multiprocessing (并行)
│ └───────────┘ └──────────┘ │
└─────────────────────────────┘
对于 Agent 开发,asyncio 是你的主战场。 原因很简单:Agent 的瓶颈在 I/O(网络请求、API 调用),不在 CPU 计算。asyncio 恰好擅长处理 I/O 密集型任务。
asyncio 基础:async/await
如果你来自前端,async/await 对你来说几乎是零学习成本。Python 的语法几乎一模一样。
第一个异步函数
import asyncio
async def greet(name: str, delay: float) -> str:
print(f"开始处理: {name}")
await asyncio.sleep(delay) # 模拟异步 I/O 操作
print(f"完成处理: {name}")
return f"Hello, {name}!"
async def main():
# 串行执行 - 耗时 1 + 2 = 3 秒
result1 = await greet("Alice", 1)
result2 = await greet("Bob", 2)
print(result1, result2)
asyncio.run(main())
输出:
开始处理: Alice
完成处理: Alice
开始处理: Bob
完成处理: Bob
Hello, Alice! Hello, Bob!
看到了吗?两个任务是串行的。await 就是"等这个完成再往下走"的意思,和 JavaScript 一样。
并发执行:asyncio.gather
async def main():
# 并发执行 - 耗时 max(1, 2) = 2 秒
result1, result2 = await asyncio.gather(
greet("Alice", 1),
greet("Bob", 2),
)
print(result1, result2)
输出:
开始处理: Alice
开始处理: Bob
完成处理: Alice
完成处理: Bob
Hello, Alice! Hello, Bob!
gather 会同时启动所有协程,等它们全部完成后返回结果列表。这就是并发。
事件循环:asyncio 的心脏
事件循环是 asyncio 的核心。理解它的工作原理,很多诡异的 bug 你就能自己排查了。
事件循环工作原理
事件循环 (Event Loop) 工作流程
================================================================
┌──────────────────────────────────────────────────┐
│ 事件循环 │
│ │
│ ┌─────────┐ │
│ │ 就绪队列 │ <-- 协程在这里排队等待执行 │
│ └────┬────┘ │
│ │ │
│ v │
│ ┌─────────┐ ┌─────────────────┐ │
│ │ 取出一个 │────>│ 执行直到 await │ │
│ │ 协程 │ │ │ │
│ └─────────┘ └───────┬─────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ v v v │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ I/O 完成 │ │ 定时器 │ │ 网络回调 │ │
│ │ 回调 │ │ 到期 │ │ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ v v v │
│ ┌─────────────────────────────────────┐ │
│ │ 放回就绪队列,等待下次调度 │ │
│ └─────────────────────────────────────┘ │
└──────────────────────────────────────────────────┘
关键点:
- 事件循环是单线程的,同一时刻只执行一个协程
- 当协程遇到
await(比如网络请求)时,事件循环把它挂起,去执行别的协程 - I/O 完成后,回调把协程放回就绪队列
- 这就是为什么 asyncio 是并发而不是并行 -- 它在同一个线程里来回切换
手动操作事件循环
import asyncio
async def fetch_data(url: str) -> dict:
print(f"请求 {url}")
await asyncio.sleep(1) # 模拟网络请求
return {"url": url, "status": 200}
async def main():
# gather 并发执行多个协程
results = await asyncio.gather(
fetch_data("https://api.example.com/users"),
fetch_data("https://api.example.com/orders"),
fetch_data("https://api.example.com/products"),
)
for r in results:
print(r)
# asyncio.run() 做了三件事:
# 1. 创建新的事件循环
# 2. 运行 main() 协程
# 3. 关闭事件循环
asyncio.run(main())
协程的本质
协程 vs 线程 vs 进程
| 特性 | 协程 | 线程 | 进程 |
|---|---|---|---|
| 创建开销 | 极低(几 KB) | 中等(几 MB) | 高(几十 MB) |
| 切换开销 | 极低 | 中等 | 高 |
| 调度方式 | 协作式(主动让出) | 抢占式(OS 调度) | 抢占式(OS 调度) |
| 数据共享 | 需要同步 | 需要同步 | 需要 IPC |
| 适用场景 | I/O 密集 | I/O 密集 + 少量 CPU | CPU 密集 |
协程是协作式的
这一点非常重要。线程的调度是操作系统强制的(抢占式),但协程的切换是你主动让出控制权的(协作式)。
async def cpu_bound():
"""这个协程不会主动让出控制权,会阻塞整个事件循环!"""
total = 0
for i in range(10**7): # CPU 密集计算
total += i
return total
async def io_bound():
"""这个协程会频繁让出控制权"""
await asyncio.sleep(0.1)
return "done"
如果 cpu_bound() 正在执行,在它完成之前,io_bound() 永远不会被执行。这跟 JavaScript 里一段同步代码阻塞主线程是一回事。
TaskGroup:现代的并发编排方式
Python 3.11 引入的 asyncio.TaskGroup 是 gather 的升级版,更安全、更直观。
gather vs TaskGroup
import asyncio
async def risky_task(task_id: int) -> str:
if task_id == 2:
raise ValueError(f"Task {task_id} 失败了!")
await asyncio.sleep(1)
return f"Task {task_id} 完成"
async def demo_gather():
"""gather: 一个任务失败,其他任务继续运行,异常被静默吞掉"""
results = await asyncio.gather(
risky_task(1),
risky_task(2), # 这个会失败
risky_task(3),
return_exceptions=True, # 不加这个会抛异常
)
print("gather 结果:", results)
# [Task 1 完成, ValueError('Task 2 失败了!'), Task 3 完成]
async def demo_task_group():
"""TaskGroup: 一个任务失败,整个组都会被取消,更符合直觉"""
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(risky_task(1))
task2 = tg.create_task(risky_task(2)) # 失败
task3 = tg.create_task(risky_task(3))
# 到这里时,所有任务都已完成
except* ValueError as eg:
print(f"TaskGroup 捕获到 {len(eg.exceptions)} 个异常")
for exc in eg.exceptions:
print(f" - {exc}")
asyncio.run(demo_gather())
asyncio.run(demo_task_group())
选择指南:
- 需要所有结果且希望任务互相独立 -- 用
gather - 需要"要么全成功,要么全部取消"的语义 -- 用
TaskGroup - Agent 场景中多数用
gather,因为你通常希望即使某个工具调用失败,其他结果还能用
Semaphore:并发限流
Agent 场景经常需要调用外部 API,而 API 通常有速率限制。Semaphore 就是干这个的。
import asyncio
from typing import Any
async def call_llm(prompt: str, semaphore: asyncio.Semaphore) -> str:
"""模拟 LLM API 调用,通过 Semaphore 限制并发数"""
async with semaphore:
print(f"调用 LLM: {prompt[:30]}...")
await asyncio.sleep(2) # 模拟 API 延迟
return f"回答: {prompt[:20]}..."
async def main():
# 最多同时 3 个请求
semaphore = asyncio.Semaphore(3)
prompts = [f"问题 {i}" for i in range(10)]
# 并发执行,但同时最多 3 个
results = await asyncio.gather(
*[call_llm(p, semaphore) for p in prompts]
)
for r in results:
print(r)
asyncio.run(main())
Semaphore 工作原理
Semaphore(3) 限流示意
================================================================
请求数: 1 2 3 4 5 6 7 8 9 10
| | | | | | | | | |
v v v v v v v v v v
┌──────────────────────────────┐
│ Semaphore(3) │
│ │
│ 信号量计数: │
│ 请求1: 3->2 [放行] │
│ 请求2: 2->1 [放行] │
│ 请求3: 1->0 [放行] │
│ 请求4: 0 [阻塞等待] │
│ 请求5: 0 [阻塞等待] │
│ ... │
│ 请求1完成: 0->1 [请求4放行] │
│ 请求2完成: 1->2 [请求5放行] │
│ ... │
└──────────────────────────────┘
异步上下文管理器
在 Agent 开发中,管理连接池、数据库会话等资源时,异步上下文管理器非常有用。
import asyncio
from types import TracebackType
from typing import Any, Self
class AsyncLLMClient:
"""异步 LLM 客户端,管理连接生命周期"""
def __init__(self, api_key: str, max_concurrent: int = 5):
self.api_key = api_key
self.semaphore = asyncio.Semaphore(max_concurrent)
self._session = None
async def __aenter__(self) -> Self:
print("初始化异步 HTTP 会话...")
# 在实际代码中,这里会创建 aiohttp.ClientSession
self._session = "mock_session"
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
print("关闭异步 HTTP 会话...")
self._session = None
async def generate(self, prompt: str) -> str:
async with self.semaphore:
await asyncio.sleep(1) # 模拟 API 调用
return f"回答: {prompt}"
async def main():
async with AsyncLLMClient(api_key="sk-xxx", max_concurrent=3) as client:
tasks = [client.generate(f"问题 {i}") for i in range(5)]
results = await asyncio.gather(*tasks)
for r in results:
print(r)
asyncio.run(main())
对比 JavaScript 的对应概念
# Python: async context manager
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
// JavaScript: 用 using 或手动管理
const controller = new AbortController();
try {
const response = await fetch(url, { signal: controller.signal });
const data = await response.json();
} finally {
controller.abort();
}
异步迭代器:流式处理 Agent 输出
Agent 经常需要流式处理 LLM 的输出(SSE),异步迭代器正好派上用场。
import asyncio
from collections.abc import AsyncIterator
async def stream_llm_response(prompt: str) -> AsyncIterator[str]:
"""模拟流式 LLM 输出"""
words = f"这是对'{prompt}'的流式回答".split()
for word in words:
await asyncio.sleep(0.1) # 模拟流式延迟
yield word
async def process_stream():
async for chunk in stream_llm_response("什么是 Agent"):
print(chunk, end="", flush=True)
print() # 换行
asyncio.run(process_stream())
异步生成器 vs 普通生成器
# 普通生成器 - 适合 CPU 密集或同步 I/O
def sync_generator():
for i in range(10):
yield i # 不会挂起
# 异步生成器 - 适合异步 I/O
async def async_generator():
for i in range(10):
await asyncio.sleep(0.1) # 会挂起
yield i
Agent 场景实战:并发编排
案例:并发工具执行
一个典型的 Agent 可能有多个工具需要并发调用:
import asyncio
import time
from dataclasses import dataclass
from typing import Any
@dataclass
class ToolResult:
tool_name: str
success: bool
data: Any = None
error: str | None = None
async def search_web(query: str) -> ToolResult:
"""模拟搜索引擎调用"""
await asyncio.sleep(1.5)
return ToolResult(
tool_name="search_web",
success=True,
data={"results": [f"搜索结果: {query}"]},
)
async def query_knowledge_base(query: str) -> ToolResult:
"""模拟知识库查询"""
await asyncio.sleep(1.0)
return ToolResult(
tool_name="knowledge_base",
success=True,
data={"docs": [f"文档片段: {query}"]},
)
async def call_llm_api(prompt: str) -> ToolResult:
"""模拟 LLM API 调用"""
await asyncio.sleep(2.0)
return ToolResult(
tool_name="llm_api",
success=True,
data={"response": f"LLM 回答: {prompt[:20]}..."},
)
async def run_agent_tools(query: str) -> list[ToolResult]:
"""并发执行所有工具调用"""
semaphore = asyncio.Semaphore(5) # 限制最多 5 个并发
async def limited_call(coro):
async with semaphore:
return await coro
start = time.time()
results = await asyncio.gather(
limited_call(search_web(query)),
limited_call(query_knowledge_base(query)),
limited_call(call_llm_api(query)),
)
elapsed = time.time() - start
print(f"并发执行耗时: {elapsed:.2f}s (最长单个任务: 2.0s)")
return list(results)
async def main():
results = await run_agent_tools("Python 异步编程")
for r in results:
print(f" [{r.tool_name}] {'成功' if r.success else '失败'}: {r.data}")
asyncio.run(main())
案例:多 Agent 协作
import asyncio
from dataclasses import dataclass
@dataclass
class AgentMessage:
sender: str
content: str
round_num: int
async def researcher_agent(query: str, round_num: int) -> AgentMessage:
"""研究员 Agent:负责收集信息"""
await asyncio.sleep(1.5)
return AgentMessage(
sender="researcher",
content=f"关于'{query}'的调研结果...",
round_num=round_num,
)
async def writer_agent(research: str, round_num: int) -> AgentMessage:
"""写作 Agent:负责整理成文"""
await asyncio.sleep(2.0)
return AgentMessage(
sender="writer",
content=f"基于调研整理的文章: {research[:30]}...",
round_num=round_num,
)
async def reviewer_agent(draft: str, round_num: int) -> AgentMessage:
"""审核 Agent:负责质量检查"""
await asyncio.sleep(1.0)
return AgentMessage(
sender="reviewer",
content=f"审核意见: 内容质量良好,建议补充数据",
round_num=round_num,
)
async def run_multi_agent_pipeline(query: str) -> list[AgentMessage]:
"""多 Agent 流水线:researcher -> writer -> reviewer"""
messages = []
# 第一轮:并发调研
print("[Round 1] 并发调研...")
research_tasks = [
researcher_agent(query, 1) for _ in range(3)
]
research_results = await asyncio.gather(*research_tasks)
messages.extend(research_results)
# 第二轮:基于调研结果并发写作
print("[Round 2] 并发写作...")
combined_research = " ".join(r.content for r in research_results)
writing_tasks = [
writer_agent(combined_research, 2) for _ in range(2)
]
writing_results = await asyncio.gather(*writing_tasks)
messages.extend(writing_results)
# 第三轮:并发审核
print("[Round 3] 并发审核...")
all_drafts = " ".join(w.content for w in writing_results)
review_tasks = [
reviewer_agent(all_drafts, 3)
]
review_results = await asyncio.gather(*review_tasks)
messages.extend(review_results)
return messages
async def main():
messages = await run_multi_agent_pipeline("AI Agent 架构设计")
for msg in messages:
print(f" [Round {msg.round_num}] {msg.sender}: {msg.content[:40]}...")
asyncio.run(main())
案例:带超时和重试的 LLM 调用
import asyncio
from typing import Any
async def call_llm_with_retry(
prompt: str,
max_retries: int = 3,
timeout: float = 10.0,
backoff_factor: float = 2.0,
) -> str:
"""带指数退避重试的 LLM 调用"""
last_error = None
for attempt in range(max_retries):
try:
result = await asyncio.wait_for(
_actual_llm_call(prompt),
timeout=timeout,
)
return result
except asyncio.TimeoutError:
last_error = f"超时 (attempt {attempt + 1})"
wait_time = backoff_factor ** attempt
print(f" 调用超时,{wait_time}s 后重试...")
except Exception as e:
last_error = str(e)
wait_time = backoff_factor ** attempt
print(f" 调用失败: {e},{wait_time}s 后重试...")
await asyncio.sleep(wait_time)
raise RuntimeError(f"LLM 调用失败,已重试 {max_retries} 次: {last_error}")
async def _actual_llm_call(prompt: str) -> str:
"""模拟实际的 LLM 调用"""
import random
await asyncio.sleep(random.uniform(0.5, 3.0))
if random.random() < 0.3:
raise ConnectionError("API 连接失败")
return f"LLM 回答: {prompt[:30]}..."
async def main():
# 并发调用多个 LLM,每个都有独立的重试和超时
tasks = [
call_llm_with_retry(f"问题 {i}")
for i in range(5)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f" 问题 {i}: 失败 - {result}")
else:
print(f" 问题 {i}: {result}")
asyncio.run(main())
aiohttp:异步 HTTP 请求
在 Agent 开发中,aiohttp 是最常用的异步 HTTP 客户端。这里给出一个完整的对比。
同步 vs 异步 HTTP 性能对比
import asyncio
import time
import aiohttp
# 同步版本(需要 requests 库)
def sync_fetch_all(urls: list[str]) -> list[dict]:
"""同步方式:串行请求"""
import requests
results = []
for url in urls:
resp = requests.get(url, timeout=5)
results.append({"url": url, "status": resp.status_code})
return results
# 异步版本
async def async_fetch_all(urls: list[str]) -> list[dict]:
"""异步方式:并发请求"""
async with aiohttp.ClientSession() as session:
async def fetch_one(url: str) -> dict:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return {"url": url, "status": resp.status}
tasks = [fetch_one(url) for url in urls]
return await asyncio.gather(*tasks)
# 带连接池限制的版本
async def async_fetch_limited(urls: list[str], max_concurrent: int = 10) -> list[dict]:
"""控制并发数的异步请求"""
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(limit=max_concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
async def fetch_one(url: str) -> dict:
async with semaphore:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return {"url": url, "status": resp.status}
return await asyncio.gather(*[fetch_one(u) for u in urls])
性能基准对比
性能对比: 100 个 HTTP 请求
================================================================
同步串行 (requests):
████████████████████████████████████████████████ 50.0s
异步并发 (aiohttp, 无限制):
███ 3.2s
异步并发 (aiohttp, 限制 10 并发):
████████ 8.1s
异步并发 (aiohttp, 限制 20 并发):
██████ 5.3s
(以上数据为示例值,实际取决于网络和目标服务器)
要点:
- 异步比同步快 10-50 倍(I/O 密集场景)
- 无限并发可能导致被封 IP 或 OOM,需要合理设置限制
TCPConnector控制连接池大小,Semaphore控制逻辑并发数
常见陷阱
做 Agent 开发时,这些坑你大概率会踩到。提前了解,省得调试到怀疑人生。
陷阱 1:在异步代码中调用同步阻塞函数
这是新手最常犯的错误。
# 错误示范 - 会阻塞整个事件循环!
import asyncio
import time
async def bad_example():
time.sleep(3) # 这是同步阻塞,整个事件循环会卡住
print("这行要等 3 秒才执行")
async def good_example():
await asyncio.sleep(3) # 正确:用异步版本
print("这行会并发执行")
# 如果必须调用同步代码,用 run_in_executor
async def also_ok():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, time.sleep, 3) # 放到线程池执行
print("通过线程池执行同步代码")
经验法则: 任何涉及 I/O 的操作(文件读写、网络请求、数据库查询),都要用对应的异步版本。
陷阱 2:忘记 await
async def get_user():
return {"name": "Alice"}
async def main():
# 错误!返回的是协程对象,不是结果
user = get_user()
print(user) # <coroutine object get_user at 0x...>
# 正确
user = await get_user()
print(user) # {'name': 'Alice'}
Python 不会报错,只是给你一个协程对象。这种 bug 很隐蔽,特别是嵌套较深时。
陷阱 3:在事件循环内创建新事件循环
import asyncio
async def main():
# 错误!已经在一个事件循环里了,不能再创建
# asyncio.run(another_coroutine()) # RuntimeError
# 正确:直接 await
result = await another_coroutine()
async def another_coroutine():
return 42
asyncio.run(main())
陷阱 4:异步上下文管理器中忘记使用 async with
import asyncio
from typing import Any
class MyResource:
async def __aenter__(self):
print("获取资源")
return self
async def __aexit__(self, *args):
print("释放资源")
async def main():
# 错误!普通 with 不会调用 __aenter__
# with MyResource(): ...
# 正确
async with MyResource() as res:
print("使用资源")
陷阱 5:gather 中异常处理不当
import asyncio
async def might_fail():
raise ValueError("boom")
async def main():
# 危险!第一个异常会直接抛出,但其他任务还在后台运行
try:
await asyncio.gather(
might_fail(),
might_fail(),
might_fail(),
)
except ValueError:
print("捕获到异常,但其他任务还在跑...")
# 安全做法:使用 return_exceptions=True
results = await asyncio.gather(
might_fail(),
might_fail(),
might_fail(),
return_exceptions=True,
)
errors = [r for r in results if isinstance(r, Exception)]
print(f"收集到 {len(errors)} 个异常")
# 更安全:使用 TaskGroup(Python 3.11+)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(might_fail())
tg.create_task(might_fail())
except* ValueError as eg:
print(f"TaskGroup 捕获到异常: {len(eg.exceptions)} 个")
asyncio.run(main())
async/await 与 JavaScript 的对比
作为前端同学,对比学习效率更高:
| 特性 | JavaScript | Python asyncio |
|---|---|---|
| 基本语法 | async function / await | async def / await |
| 并发执行 | Promise.all([...]) | asyncio.gather(...) |
| 错误处理 | try/catch | try/except + except* |
| 事件循环 | V8 引擎内置 | 需要手动 asyncio.run() |
| 取消机制 | AbortController | task.cancel() |
| 超时控制 | Promise.race + setTimeout | asyncio.wait_for(coro, timeout) |
| 异步迭代 | for await...of | async for...in |
| 异步上下文 | 无原生支持(TS 5.2+ using) | async with |
小结
对 Agent 开发来说,asyncio 是你的基础设施。核心要点:
- async/await 是协作式并发,适合 I/O 密集场景
- gather 用于并发执行多个任务,TaskGroup 提供更安全的异常处理
- Semaphore 控制并发度,避免 API 限流和资源耗尽
- async context manager 管理连接等资源的生命周期
- 永远不要在异步代码中调用同步阻塞函数(除非用 run_in_executor)
- aiohttp 是 Agent 的 HTTP 基础设施,配合连接池和超时使用
掌握这些,你就能在 Agent 开发中写出高效、可靠的并发代码了。