跳到主要内容

生产环境最佳实践

Prompt 模板更新后直接部署到生产环境——新 Prompt 长了 200 个 token,响应时间从 2 秒飙到 8 秒,用户重试导致请求量翻 3 倍,LLM API 费用一夜烧了 2000 美元。一个 Prompt 变更引发的事故,暴露了部署没有灰度、没有回滚、没有成本监控、没有限流的问题。

Agent 服务上线后必须做好的实践。

零停机部署

Agent 服务是长连接服务,用户正在对话的时候你把它停了,体验极差。零停机部署是底线。

滚动更新(Rolling Update)

Kubernetes 默认的部署策略就是滚动更新:

apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-service
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1 # 最多多 1 个 Pod
maxUnavailable: 0 # 不允许任何 Pod 不可用
template:
spec:
containers:
- name: agent
image: agent-service:v2.1.0
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 5

关键配置解释:

  • maxUnavailable: 0:滚动过程中不允许任何 Pod 下线,保证始终有完整容量
  • maxSurge: 1:最多多启动 1 个新 Pod,等新 Pod 就绪后再停旧的
  • readinessProbe:新 Pod 必须通过健康检查才会接收流量

滚动更新适合常规版本更新,但有个问题:新旧版本会同时运行一段时间。如果你的 Agent 涉及状态管理(比如对话上下文),新旧版本的状态格式可能不兼容。

蓝绿部署(Blue-Green)

蓝绿部署通过同时维护两套完整环境来解决这个问题:

┌─────────────────┐
│ Load Balancer │
└────────┬────────┘

┌─────────┴─────────┐
│ │
┌─────┴─────┐ ┌─────┴─────┐
│ Blue 环境 │ │ Green 环境 │
│ (当前 v1) │ │ (新 v2) │
└───────────┘ └───────────┘

Step 1: Green 部署新版本,不接流量
Step 2: Green 通过健康检查后,切换流量到 Green
Step 3: 确认无问题后,Blue 降级为备用
Step 4: 有问题?切回 Blue,秒级恢复

用 Istio 实现蓝绿切换:

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: agent-service
spec:
hosts:
- agent-service
http:
- route:
- destination:
host: agent-service
subset: green
weight: 100
- destination:
host: agent-service
subset: blue
weight: 0
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: agent-service
spec:
host: agent-service
subsets:
- name: blue
labels:
version: v1
- name: green
labels:
version: v2

蓝绿部署的优点是回滚极快(改一下 weight 就行),缺点是需要双倍资源。对于 Agent 服务,我的建议是:核心服务用蓝绿,边缘功能用滚动更新

健康检查设计

Agent 服务的健康检查不能只检查 HTTP 端口是否可达,还要检查依赖服务:

from fastapi import FastAPI
from fastapi.responses import JSONResponse
import httpx

app = FastAPI()

async def check_llm_api():
"""检查 LLM API 是否可用"""
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
LLM_API_URL,
headers={"Authorization": f"Bearer {API_KEY}"},
json={"model": "gpt-4", "messages": [{"role": "user", "content": "ping"}], "max_tokens": 1},
timeout=5.0,
)
return resp.status_code == 200
except Exception:
return False

async def check_vector_store():
"""检查向量数据库是否可用"""
try:
await vector_store.health_check()
return True
except Exception:
return False

@app.get("/health")
async def health_check():
checks = {
"llm_api": await check_llm_api(),
"vector_store": await check_vector_store(),
}
all_healthy = all(checks.values())
status_code = 200 if all_healthy else 503
return JSONResponse(
status_code=status_code,
content={"status": "healthy" if all_healthy else "degraded", "checks": checks},
)

注意 /health 返回 503 的时候,Kubernetes 会自动把 Pod 从 Service 端点移除,流量就不会打过来。这在滚动更新时特别重要——新版本如果连不上 LLM API,就不会接流量。

灰度发布

零停机解决了"不停服"的问题,但没有解决"新版本有 bug 怎么办"的问题。灰度发布(Canary Release)让你可以用一小部分流量验证新版本。

按百分比灰度

最简单的灰度策略:新版本先接 5% 的流量,观察指标没问题再逐步增加。

用 Istio 实现:

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: agent-service
spec:
hosts:
- agent-service
http:
- route:
- destination:
host: agent-service
subset: stable
weight: 95
- destination:
host: agent-service
subset: canary
weight: 5

灰度发布的节奏建议:

v2.1.0 发布


┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 5% 流量 │ ──▶ │ 20% 流量 │ ──▶ │ 50% 流量 │ ──▶ │ 100% 流量 │
│ 观察 30min │ │ 观察 30min │ │ 观察 1h │ │ 全量发布 │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │
▼ ▼ ▼
指标异常? 指标异常? 指标异常?
切回 v2.0 切回 v2.0 切回 v2.0

基于指标的自动回滚

光靠人盯 Prometheus 面板不现实。你需要设定自动回滚的条件:

# canary_monitor.py
import time
from prometheus_api_client import PrometheusConnect

prom = PrometheusConnect(url="http://prometheus:9090")

def get_canary_error_rate(window_minutes=5):
"""获取灰度版本的错误率"""
query = f"""
sum(rate(http_requests_total{{version="canary", status=~"5.."}}[{window_minutes}m]))
/
sum(rate(http_requests_total{{version="canary"}}[{window_minutes}m]))
* 100
"""
result = prom.custom_query(query=query)
if result:
return float(result[0]["value"][1])
return 0.0

def get_canary_p99_latency(window_minutes=5):
"""获取灰度版本的 P99 延迟"""
query = f"""
histogram_quantile(0.99,
sum(rate(http_request_duration_seconds_bucket{{version="canary"}}[{window_minutes}m])) by (le)
)
"""
result = prom.custom_query(query=query)
if result:
return float(result[0]["value"][1])
return 0.0

def should_rollback():
"""判断是否需要回滚"""
error_rate = get_canary_error_rate()
p99_latency = get_canary_p99_latency()

if error_rate > 5.0:
print(f"[ALERT] 错误率 {error_rate:.1f}% 超过阈值 5%,触发回滚")
return True
if p99_latency > 10.0:
print(f"[ALERT] P99 延迟 {p99_latency:.1f}s 超过阈值 10s,触发回滚")
return True
return False

def rollback():
"""执行回滚:把流量全部切回 stable 版本"""
import subprocess
subprocess.run([
"kubectl", "apply", "-f", "vs-stable-only.yaml"
])
print("[ROLLBACK] 已将 100% 流量切回 stable 版本")

回滚策略

回滚不是简单地把新版本下掉。Agent 服务的回滚要考虑状态一致性。

回滚决策流程:

检测到异常


┌─────────────────┐
│ 是 Prompt 变更? │──是──▶ 切换 Prompt 版本(不需要部署)
└────────┬────────┘
│否

┌─────────────────┐
│ 是模型版本变更? │──是──▶ 切换模型路由(不需要部署)
└────────┬────────┘
│否

┌─────────────────┐
│ 是代码变更? │──是──▶ 回滚 Deployment 到上一版本
└────────┬────────┘
│否

┌─────────────────┐
│ 是依赖服务变更? │──是──▶ 切换到降级模式
└─────────────────┘

Prompt 变更和模型版本变更的回滚成本最低,因为不需要重新部署。所以我的建议是:把 Prompt 和模型配置从代码中抽离出来,放到配置中心或 Feature Flag 服务中

# feature_flag.py
import httpx
import json

class FeatureFlagService:
def __init__(self, base_url: str):
self.base_url = base_url

async def get_prompt_version(self, user_id: str) -> str:
"""获取当前用户应该使用的 Prompt 版本"""
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.base_url}/api/flags/prompt-version",
params={"user_id": user_id},
)
data = resp.json()
return data.get("value", "v1")

async def get_model_config(self) -> dict:
"""获取当前的模型配置"""
async with httpx.AsyncClient() as client:
resp = await client.get(f"{self.base_url}/api/flags/model-config")
return resp.json()

# 使用示例
flag_service = FeatureFlagService("http://flags.internal:8080")

async def handle_message(user_id: str, message: str):
prompt_version = await flag_service.get_prompt_version(user_id)
model_config = await flag_service.get_model_config()

prompt = load_prompt(f"prompts/{prompt_version}.txt")
response = await llm_client.chat(
model=model_config["model"],
messages=[{"role": "user", "content": prompt + message}],
temperature=model_config.get("temperature", 0.7),
)
return response

这样回滚 Prompt 变更只需要在 Flag 服务里改一下配置值,不用重新构建和部署镜像。

密钥管理

Agent 服务需要管理大量密钥:LLM API Key、向量数据库凭证、数据库密码、MCP 服务的 Token。密钥管理不当,后果严重。

K8s Secrets

最基础的密钥管理方式:

# 创建 Secret
kubectl create secret generic agent-secrets \
--from-literal=OPENAI_API_KEY=sk-xxx \
--from-literal=ANTHROPIC_API_KEY=sk-ant-xxx \
--from-literal=DB_PASSWORD=mysecretpassword \
--dry-run=client -o yaml > secret.yaml

# 加密存储(需要配置 encryption provider)
kubectl apply -f secret.yaml

在 Deployment 中引用:

env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: OPENAI_API_KEY
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: agent-secrets
key: DB_PASSWORD

K8s Secrets 的问题在于:它是 base64 编码,不是加密。任何有 RBAC 权限的人都能读到明文。所以:

  • 开启 etcd 加密(EncryptionConfiguration)
  • 用 RBAC 严格限制谁能读 Secret
  • 不要把 Secret 提交到 Git 仓库

HashiCorp Vault

生产环境建议用 Vault:

import hvac

class VaultSecretManager:
def __init__(self, url: str, token: str):
self.client = hvac.Client(url=url, token=token)

def get_secret(self, path: str, key: str) -> str:
"""从 Vault 获取密钥"""
response = self.client.secrets.kv.v2.read_secret_version(
path=path,
)
return response["data"]["data"][key]

def get_llm_api_key(self, provider: str = "openai") -> str:
"""获取 LLM API Key"""
return self.get_secret("agent/llm-keys", f"{provider}_api_key")

# 使用
vault = VaultSecretManager(url="http://vault:8200", token=os.environ["VAULT_TOKEN"])
api_key = vault.get_llm_api_key("openai")

Vault 的好处:

  • 密钥加密存储,支持自动轮换
  • 审计日志:谁在什么时候访问了哪个密钥
  • 动态密钥:可以为每个请求生成临时凭证
  • 支持多种认证方式(AppRole、Kubernetes SA、OIDC)

环境变量注入

无论用哪种密钥管理方案,最终都要注入到运行时。推荐的注入顺序:

优先级从高到低:

1. 命令行参数 (测试/调试用)
2. 环境变量 (容器运行时)
3. 挂载的 Secret 文件 (Vault Agent Sidecar)
4. 配置文件 (兜底)

Vault Agent Sidecar 自动注入的 K8s 注解:

annotations:
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "agent-service"
vault.hashicorp.com/agent-inject-secret-llm-key: "agent/llm-keys"
vault.hashicorp.com/agent-inject-template-llm-key: |
{{- with secret "agent/llm-keys" -}}
export OPENAI_API_KEY="{{ .Data.data.openai_api_key }}"
export ANTHROPIC_API_KEY="{{ .Data.data.anthropic_api_key }}"
{{- end -}}

成本监控

LLM API 调用是 Agent 服务最大的成本来源。不监控成本,你根本不知道钱花在了哪里。

Token 消耗追踪

每次请求都要记录 token 消耗:

import time
from dataclasses import dataclass, field
from prometheus_client import Counter, Histogram, Gauge

# Prometheus 指标定义
llm_tokens_total = Counter(
"llm_tokens_total",
"LLM token 消耗总量",
["model", "type", "operation"], # type: prompt/completion, operation: chat/embedding
)

llm_request_duration = Histogram(
"llm_request_duration_seconds",
"LLM API 请求耗时",
["model"],
buckets=[0.5, 1, 2, 5, 10, 30, 60],
)

daily_cost_usd = Gauge(
"llm_daily_cost_usd",
"当日 LLM API 消费(美元)",
["model"],
)

# 各模型的 token 单价(美元/1M tokens)
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
"claude-haiku-4-20250414": {"input": 0.25, "output": 1.25},
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""计算单次请求的费用"""
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000
return cost

async def call_llm_with_metrics(model: str, messages: list) -> dict:
"""带 metrics 的 LLM 调用"""
start = time.time()

response = await llm_client.chat(model=model, messages=messages)

duration = time.time() - start
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens

# 记录指标
llm_tokens_total.labels(model=model, type="prompt", operation="chat").inc(input_tokens)
llm_tokens_total.labels(model=model, type="completion", operation="chat").inc(output_tokens)
llm_request_duration.labels(model=model).observe(duration)

# 计算并记录费用
cost = calculate_cost(model, input_tokens, output_tokens)
daily_cost_usd.labels(model=model).inc(cost)

return response

预算告警

设置多级告警,防止单日费用失控:

# budget_alerts.py
from prometheus_client import Gauge

BUDGET_THRESHOLDS = {
"warning": 50.0, # 50 美元:发通知
"critical": 100.0, # 100 美元:限制流量
"emergency": 200.0, # 200 美元:熔断,只允许降级模式
}

async def check_budget():
"""检查当日预算使用情况"""
current_cost = await get_daily_cost() # 从 Prometheus 查询

if current_cost >= BUDGET_THRESHOLDS["emergency"]:
await send_alert("emergency", f"当日费用已达 ${current_cost:.2f},切换到降级模式")
await enable_degraded_mode()
elif current_cost >= BUDGET_THRESHOLDS["critical"]:
await send_alert("critical", f"当日费用已达 ${current_cost:.2f},开始限制流量")
await enable_rate_limiting(strict=True)
elif current_cost >= BUDGET_THRESHOLDS["warning"]:
await send_alert("warning", f"当日费用已达 ${current_cost:.2f},请注意控制")

async def enable_degraded_mode():
"""降级模式:只允许使用最便宜的模型"""
await feature_flags.set("model_tier", "minimal") # 只用 gpt-4o-mini
await feature_flags.set("max_tokens_per_request", 200)
await feature_flags.set("enable_rag", False) # 关闭 RAG,减少 embedding 调用

每用户配额

防止单个用户把预算花光:

class UserQuotaManager:
def __init__(self, redis_client):
self.redis = redis_client

async def check_and_consume(self, user_id: str, estimated_tokens: int) -> bool:
"""检查用户配额是否足够"""
key = f"quota:{user_id}:{datetime.now().strftime('%Y%m%d')}"
current = await self.redis.get(key)
current = int(current) if current else 0

daily_limit = 50000 # 每用户每日 50K tokens
if current + estimated_tokens > daily_limit:
return False

await self.redis.incrby(key, estimated_tokens)
await self.redis.expire(key, 86400)
return True

限流

限流保护你的服务不被流量打垮,也保护你的 LLM API 配额。

令牌桶算法

令牌桶是最常用的限流算法,允许突发流量:

import asyncio
import time

class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
"""
capacity: 桶容量(最大令牌数)
refill_rate: 每秒补充的令牌数
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
self._lock = asyncio.Lock()

async def consume(self, tokens: int = 1) -> bool:
"""尝试消费令牌"""
async with self._lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now

if self.tokens >= tokens:
self.tokens -= tokens
return True
return False

# 为每个用户创建限流器
user_buckets: dict[str, TokenBucket] = {}

def get_user_bucket(user_id: str) -> TokenBucket:
if user_id not in user_buckets:
# 每用户:每秒 2 个请求,最多突发 10 个
user_buckets[user_id] = TokenBucket(capacity=10, refill_rate=2.0)
return user_buckets[user_id]

async def rate_limit_middleware(request, handler):
"""FastAPI 限流中间件"""
user_id = request.headers.get("X-User-ID", "anonymous")
bucket = get_user_bucket(user_id)

if not await bucket.consume():
return JSONResponse(
status_code=429,
content={"error": "请求过于频繁,请稍后再试"},
headers={"Retry-After": "1"},
)
return await handler(request)

滑动窗口算法

滑动窗口比令牌桶更精确,适合按时间窗口限流:

import time
from collections import defaultdict

class SlidingWindowRateLimiter:
def __init__(self, window_seconds: int, max_requests: int):
self.window_seconds = window_seconds
self.max_requests = max_requests
self.requests: dict[str, list[float]] = defaultdict(list)

def is_allowed(self, key: str) -> bool:
"""检查是否允许请求"""
now = time.time()
window_start = now - self.window_seconds

# 清理过期记录
self.requests[key] = [t for t in self.requests[key] if t > window_start]

if len(self.requests[key]) >= self.max_requests:
return False

self.requests[key].append(now)
return True

# 全局限流:每分钟 100 次请求
global_limiter = SlidingWindowRateLimiter(window_seconds=60, max_requests=100)

# 模型级限流:GPT-4 每分钟 20 次,GPT-4o-mini 每分钟 50 次
model_limiters = {
"gpt-4o": SlidingWindowRateLimiter(window_seconds=60, max_requests=20),
"gpt-4o-mini": SlidingWindowRateLimiter(window_seconds=60, max_requests=50),
}

多级限流

生产环境需要多级限流,从用户级到全局级:

请求进来


┌─────────────────┐
│ 用户级限流 │ 每用户每秒 2 次
│ (Token Bucket) │
└────────┬────────┘
│通过

┌─────────────────┐
│ 模型级限流 │ GPT-4 每分钟 20 次
│ (Sliding Window) │
└────────┬────────┘
│通过

┌─────────────────┐
│ 全局级限流 │ 总计每分钟 100 次
│ (Sliding Window) │
└────────┬────────┘
│通过

┌─────────────────┐
│ LLM API 配额 │ OpenAI 账户级限额
└─────────────────┘

熔断器

限流保护的是"流量太大"的场景,熔断保护的是"依赖不可用"的场景。比如 LLM API 挂了,你不能让每个请求都等 30 秒超时。

状态机

┌──────────────────────────────────────────────────────┐
│ │
│ ┌──────────┐ 失败次数达到阈值 ┌──────────┐ │
│ │ Closed │ ─────────────────▶ │ Open │ │
│ │ (正常放行) │ │ (直接拒绝) │ │
│ └──────────┘ └─────┬────┘ │
│ ▲ │ │
│ │ 超时窗口到期 │
│ │ │ │
│ │ ▼ │
│ │ 探测成功 ┌──────────┐ │
│ └────────────────────────────── │ Half-Open │ │
│ │ (探测放行) │ │
│ 探测失败 ─────────────────▶ └──────────┘ │
│ │
└──────────────────────────────────────────────────────┘

实现

import asyncio
import time
from enum import Enum

class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"

class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5, # 触发熔断的失败次数
recovery_timeout: float = 30.0, # 熔断恢复超时(秒)
half_open_max_calls: int = 3, # 半开状态最大探测请求
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls

self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
self._lock = asyncio.Lock()

async def call(self, func, *args, **kwargs):
"""通过熔断器执行函数"""
async with self._lock:
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
print("[CIRCUIT] Open -> Half-Open: 开始探测")
else:
raise CircuitOpenError("熔断器处于 Open 状态,请求被拒绝")

try:
result = await func(*args, **kwargs)
await self._on_success()
return result
except Exception as e:
await self._on_failure()
raise

async def _on_success(self):
async with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.half_open_max_calls:
self.state = CircuitState.CLOSED
self.failure_count = 0
print("[CIRCUIT] Half-Open -> Closed: 恢复正常")
else:
self.failure_count = 0

async def _on_failure(self):
async with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"[CIRCUIT] -> Open: 连续失败 {self.failure_count} 次")

class CircuitOpenError(Exception):
pass

# LLM 调用熔断器
llm_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30.0)

async def call_llm_with_breaker(model: str, messages: list) -> dict:
"""带熔断保护的 LLM 调用"""
try:
return await llm_breaker.call(call_llm, model=model, messages=messages)
except CircuitOpenError:
# 熔断后走降级逻辑
return await fallback_response(messages)

优雅降级

熔断触发后,服务不能直接返回 500 错误。优雅降级让用户仍然能得到有价值的结果。

降级策略

正常模式(Full Mode)

├── LLM API 不可用 ──▶ 降级 1:切换备用模型

├── 所有 LLM 不可用 ──▶ 降级 2:使用缓存响应

├── 缓存也没有 ──▶ 降级 3:返回预设模板

└── 预算耗尽 ──▶ 降级 4:返回错误提示 + 工单链接

实现

class GracefulDegradation:
def __init__(self):
self.model_fallback_order = [
"gpt-4o",
"claude-sonnet-4-20250514",
"gpt-4o-mini",
]
self.cache = {} # 生产环境用 Redis

async def chat(self, messages: list, user_id: str) -> dict:
"""带降级的对话接口"""
# 降级 1:按优先级尝试不同模型
for model in self.model_fallback_order:
try:
response = await call_llm(model=model, messages=messages)
# 缓存成功的响应
cache_key = self._make_cache_key(messages)
self.cache[cache_key] = response
return {"response": response, "model_used": model, "degraded": False}
except (CircuitOpenError, httpx.TimeoutException) as e:
print(f"[DEGRADE] {model} 不可用: {e},尝试下一个模型")
continue

# 降级 2:尝试缓存
cache_key = self._make_cache_key(messages)
cached = self.cache.get(cache_key)
if cached:
return {"response": cached, "model_used": "cache", "degraded": True}

# 降级 3:返回预设响应
return {
"response": "当前服务繁忙,已记录您的问题。我们会在服务恢复后尽快处理。",
"model_used": "fallback",
"degraded": True,
}

def _make_cache_key(self, messages: list) -> str:
"""生成缓存 key(基于最后一条用户消息)"""
last_msg = messages[-1]["content"]
return hashlib.md5(last_msg.encode()).hexdigest()

降级时的用户体验

降级不是偷偷摸摸的事情,要让用户知道:

async def chat_endpoint(request: ChatRequest):
result = await degradation.chat(request.messages, request.user_id)

response_data = {
"message": result["response"],
"metadata": {
"model": result["model_used"],
},
}

if result["degraded"]:
response_data["metadata"]["degraded"] = True
response_data["metadata"]["notice"] = "当前处于降级模式,响应质量可能受影响"

return response_data

安全加固

Agent 服务处理用户输入,然后把输出交给 LLM 处理。这中间有多个攻击面。

输入校验

import re

class InputValidator:
MAX_INPUT_LENGTH = 4000 # 最大输入长度
MAX_TOOL_CALLS = 5 # 单次请求最大工具调用数
BLOCKED_PATTERNS = [
r"(?i)(ignore|disregard)\s+(previous|above)\s+(instructions|prompt)", # Prompt 注入
r"(?i)you\s+are\s+now\s+(a|an|the)", # 角色劫持
r"(?i)system\s*:\s*", # 系统消息伪造
]

def validate(self, user_input: str) -> tuple[bool, str]:
"""校验用户输入"""
# 长度检查
if len(user_input) > self.MAX_INPUT_LENGTH:
return False, f"输入过长,最多 {self.MAX_INPUT_LENGTH} 字符"

# Prompt 注入检测
for pattern in self.BLOCKED_PATTERNS:
if re.search(pattern, user_input):
return False, "检测到不安全的输入内容"

# 特殊字符过滤(在代码块内的内容可以放宽)
if user_input.count("```") % 2 != 0:
return False, "格式异常:代码块未闭合"

return True, ""

validator = InputValidator()

Prompt 注入防护

Prompt 注入是 Agent 服务最大的安全威胁。攻击者试图通过用户输入劫持 Agent 的行为。

多层防护策略:

class PromptInjectionGuard:
def __init__(self):
self.sensitive_keywords = [
"忽略之前的指令",
"ignore previous instructions",
"你现在是一个",
"你现在扮演",
"system prompt",
"你的系统指令是什么",
]

def detect(self, user_input: str) -> bool:
"""检测潜在的 Prompt 注入"""
input_lower = user_input.lower()
return any(kw.lower() in input_lower for kw in self.sensitive_keywords)

def sanitize_for_llm(self, messages: list) -> list:
"""为 LLM 调用做安全处理"""
sanitized = []

# 系统消息固定,不被用户输入覆盖
sanitized.append({
"role": "system",
"content": SYSTEM_PROMPT,
})

# 用户消息用特殊分隔符包裹
for msg in messages:
if msg["role"] == "user":
sanitized.append({
"role": "user",
"content": f"<user_message>\n{msg['content']}\n</user_message>",
})
else:
sanitized.append(msg)

return sanitized

guard = PromptInjectionGuard()

async def safe_chat(user_input: str, history: list):
# 第一层:输入检测
if guard.detect(user_input):
return "抱歉,我无法处理这个请求。"

# 第二层:消息消毒
safe_messages = guard.sanitize_for_llm(
history + [{"role": "user", "content": user_input}]
)

# 第三层:输出校验(在响应返回前检查)
response = await call_llm(messages=safe_messages)
response = validate_output(response)

return response

输出净化

LLM 的输出可能包含敏感信息,需要在返回给用户前做净化:

import re

class OutputSanitizer:
# PII 正则模式
PII_PATTERNS = {
"phone": r"1[3-9]\d{9}",
"id_card": r"\d{17}[\dXx]",
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"bank_card": r"\d{16,19}",
}

def sanitize(self, text: str) -> str:
"""净化输出中的敏感信息"""
sanitized = text
for pii_type, pattern in self.PII_PATTERNS.items():
matches = re.findall(pattern, sanitized)
for match in matches:
replacement = self._mask(match, pii_type)
sanitized = sanitized.replace(match, replacement)
return sanitized

def _mask(self, value: str, pii_type: str) -> str:
"""脱敏处理"""
if pii_type == "phone":
return value[:3] + "****" + value[7:]
elif pii_type == "id_card":
return value[:6] + "********" + value[14:]
elif pii_type == "email":
local, domain = value.split("@")
return local[:2] + "***@" + domain
elif pii_type == "bank_card":
return "****" + value[-4:]
return "***"

sanitizer = OutputSanitizer()

async def chat_with_sanitization(user_input: str):
response = await call_llm(messages=[{"role": "user", "content": user_input}])
# 净化后再返回
return sanitizer.sanitize(response)

审计日志

所有 LLM 交互都要记录审计日志,用于事后追溯:

import logging
import json
from datetime import datetime

audit_logger = logging.getLogger("audit")

def log_llm_interaction(
user_id: str,
model: str,
input_tokens: int,
output_tokens: int,
tool_calls: list,
risk_level: str, # low / medium / high
):
"""记录 LLM 交互审计日志"""
audit_logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"tool_calls": tool_calls,
"risk_level": risk_level,
}))

常见坑

坑 1:Prompt 变更没有版本控制

直接在代码里改 Prompt 字符串,部署后出问题了想回滚,发现 Prompt 已经和代码混在一起,无法单独回滚。

解法:Prompt 放在独立文件中,用 Git 管理版本,部署时挂载到容器里。

坑 2:没有设置请求超时

LLM API 偶尔会响应很慢(30 秒以上)。如果没设置超时,你的线程/协程会被阻塞,最终整个服务雪崩。

# 错误做法
response = await client.post(LLM_URL, json=payload) # 没有 timeout

# 正确做法
response = await client.post(LLM_URL, json=payload, timeout=30.0)

坑 3:日志里打印了完整的 LLM 响应

LLM 的响应可能包含用户的敏感信息。生产环境的日志如果打印了完整响应,可能违反数据合规要求。

# 错误做法
logger.info(f"LLM response: {response}")

# 正确做法
logger.info(f"LLM response: model={model}, tokens={usage}, latency={latency}")
# 如果必须记录内容,做脱敏处理
logger.debug(f"LLM response content: {sanitizer.sanitize(response)}")

坑 4:并发请求没有限制

用户可以发送无限多的并发请求,每个请求都调 LLM API,最终打爆 API 配额。

# 错误做法:没有并发控制
@app.post("/chat")
async def chat(request: ChatRequest):
return await call_llm(request.messages) # 1000 个并发直接打爆

# 正确做法:用 Semaphore 限制并发
semaphore = asyncio.Semaphore(50) # 全局最多 50 个并发 LLM 调用

@app.post("/chat")
async def chat(request: ChatRequest):
async with semaphore:
return await call_llm(request.messages)

坑 5:忽略模型上下文窗口限制

不同的模型有不同的上下文窗口。如果你的对话历史超过了模型的上下文窗口,要么截断出错,要么被 API 拒绝。

# 正确做法:检查 token 数量并在必要时截断
def truncate_messages(messages: list, max_tokens: int) -> list:
"""截断消息列表,确保不超过 token 限制"""
truncated = [messages[0]] # 保留系统消息
total_tokens = estimate_tokens(messages[0]["content"])

for msg in reversed(messages[1:]): # 从最新的消息开始保留
msg_tokens = estimate_tokens(msg["content"])
if total_tokens + msg_tokens > max_tokens * 0.8: # 留 20% 余量给回复
break
truncated.insert(1, msg) # 插入到系统消息之后
total_tokens += msg_tokens

return truncated

参考资料