百炼 API 对单位时间内的请求数量、Token 用量及其增长速率设有限制,即限流。大模型服务具有长延迟和双维度限流(同时限制请求数与 Token 量)的特性,传统的"遇错即重试"策略难以有效应对,需要针对性的流控措施。
本文按介入成本从低到高,介绍三类应对方案:
如果当前正在解决 429 报错,可直接查看错误诊断与策略推荐定位原因。
错误诊断与策略推荐
同一错误码可能由不同限流维度触发。此外,高并发下服务端饱和也可能导致响应变慢或超时,可通过下文的自适应拥塞控制策略缓解。
错误码 (DashScope / OpenAI) | 触发维度 | 特征诊断 | 推荐策略 |
Throttling.RateQuota / limit_requests | 请求频率超限 (RPM 超限) | 间歇性报错,成功率随时间下降 | 令牌桶:控制单位时间内的请求配额 |
请求频率超限 (RPS 超限) | 启动瞬间或并发激增时集中报错 | 并发信号量或平滑限速器:拉开请求间距 |
Throttling.AllocationQuota / insufficient_quota | Token 用量超限 (TPM 超限) | 长文本处理时间歇性报错 | 双重令牌桶:同时限制 RPM 和 TPM 配额 |
Token 用量超限 (TPS 超限) | 长文本并发时瞬间 Token 消耗过大 | 并发信号量或平滑限速器 |
Throttling.BurstRate / limit_burst_rate | 流量增速超限 (Traffic Burst) | 启动或空闲恢复后突然发起大量请求 | 令牌桶设置低初始值(如 initial_tokens=0)实现冷启动缓起;或使用平滑限速器削峰 |
客户端流控策略
当平台配置方案无法满足需求时,需要在客户端引入流控机制。核心原则是将请求尽可能均匀分布在时间窗口内,避免突发流量触发限流。系统刚启动或长时间空闲后,应逐步提升并发量而非瞬间拉满。
以下四种策略按工程复杂度从低到高排列。每种策略包含上一级的能力并在此基础上增强:
建议在满足业务需求的前提下,优先选择实现成本更低的策略。
各策略的吞吐量表现对比

以上四种客户端流控策略在不同负载下的有效吞吐量表现差异如下:
基础重试策略:低负载下有效,高并发下易触发拥塞崩溃,吞吐量断崖式下降。
请求速率限制策略:防崩溃能力强,但长文本混合负载下因缺乏 Token 管控,吞吐量呈锯齿状波动。
流量整形策略:稳定性高,以牺牲部分峰值吞吐换取平稳输出。
自适应拥塞控制策略:高负载下可动态收敛至稳定高吞吐点,但存在冷启动探测开销。
基础重试策略
适用于个人测试、本地脚本和低频后台任务等非高并发场景。默认不限制发送速率,仅在收到 429 或 5xx 错误时,触发带随机抖动的指数退避重试。
该策略没有前置流量控制,在多线程并发下极易触发限流并导致大面积请求积压报错。
代码示例
使用 tenacity 库
import openai
from openai import OpenAI
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
retry_if_exception_type
)
RETRYABLE_ERRORS = (
openai.RateLimitError,
openai.InternalServerError,
openai.APIConnectionError,
)
@retry(
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
retry=retry_if_exception_type(RETRYABLE_ERRORS)
)
def chat_with_retry(client, model, messages, max_tokens):
return client.chat.completions.create(
model=model,
max_tokens=max_tokens,
messages=messages
)
client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="YOUR_DASHSCOPE_API_KEY"
)
try:
response = chat_with_retry(
client=client,
model="qwen-plus",
messages=[{"role": "user", "content": "什么是指数退避重试?"}],
max_tokens=1024
)
print(response.choices[0].message.content)
except Exception as e:
print(f"请求失败: {e}")
原生实现(无依赖)
import time
import random
import openai
from openai import OpenAI
RETRYABLE_ERRORS = (
openai.RateLimitError,
openai.InternalServerError,
openai.APIConnectionError,
)
def chat_with_retry(client, model, messages, max_tokens):
attempt = 0
max_retries = 5
base_delay = 1
max_delay = 60
while attempt <= max_retries:
try:
return client.chat.completions.create(
model=model,
max_tokens=max_tokens,
messages=messages
)
except RETRYABLE_ERRORS as e:
attempt += 1
if attempt > max_retries:
raise e
backoff = min(max_delay, base_delay * (2 ** (attempt - 1)))
sleep_time = backoff + random.uniform(0, 1)
print(f"触发 {type(e).__name__},等待 {sleep_time:.2f}s 后重试...")
time.sleep(sleep_time)
client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="YOUR_DASHSCOPE_API_KEY"
)
try:
response = chat_with_retry(
client=client,
model="qwen-plus",
messages=[{"role": "user", "content": "什么是指数退避重试?"}],
max_tokens=1024
)
print(response.choices[0].message.content)
except Exception as e:
print(f"请求失败: {e}")
上述代码使用指数退避而非固定间隔重试。固定间隔重试(如统一 3 秒后重试)会让所有失败请求在同一时刻重新发起,极易再次触发限流,形成持续拥堵。指数退避 + 随机抖动则将重试"散开":
系统便能以分散的方式恢复,而非陷入"失败—集体重试—再次失败"的恶性循环。
请求速率限制策略
仅依赖被动重试难以应对真实业务流量,频繁重试会显著增加响应延迟。请求速率限制策略引入主动流控,在请求发出前进行自我检查和调控,将无序涌入的大量请求梳理成符合平台 RPM 限额的平稳队列。触发平台限流后,通常需要一段时间才能恢复,主动平滑请求节奏虽会带来少量可控的排队延迟,但远低于被动陷入”报错—等待—重试”循环的时间成本,即用确定的小代价,避免不确定的大延迟。
适用于 Chatbot 等轻量交互、一问一答、对首字延迟敏感的在线服务。
该策略在客户端实施主动排队,分为两级控制:
两级控制必须严格按先获取 RPM 令牌,再获取并发信号量的顺序执行。并发槽位是稀缺资源,只应分配给已满足执行条件的请求。若顺序颠倒(先占槽位,再等令牌),高负载下极易引发队头阻塞(Head-of-Line Blocking)——请求占住槽位后无令牌可用,长期持槽却无法执行,所有槽位被占满,却无请求真正发送。核心原则:持有稀缺资源时,不做可能的长耗时等待。
下方代码将令牌桶初始化为满桶状态(initial_tokens=rpm_limit),适合轻量在线服务在启动初期立即处理请求。若满桶启动触发速率限制错误,可降低初始令牌数(如设为 initial_tokens=0,即”空桶启动”),使系统以更平缓的速率进入工作状态。
该策略不追踪 Token 用量,长文本任务中仍会因耗尽 TPM 配额触发限流。
代码示例
核心组件:令牌桶
import time
class TokenBucket:
"""
令牌桶实现,用于控制每分钟请求数 (RPM)。
支持预支 (Debt) 机制,以保证高并发下的先进先出 (FIFO) 顺序。
"""
def __init__(self, quota_per_minute: float, initial_tokens: float = 0.0):
self.capacity = quota_per_minute
self.tokens = initial_tokens
self.refill_rate = quota_per_minute / 60.0
self.last_refill = time.monotonic()
def reserve(self, cost: float = 1.0) -> float:
"""
申请令牌。
如果令牌不足,返回需要等待的秒数(支持预支)。
"""
self._refill()
# 1. 令牌充足:直接扣除
if self.tokens >= cost:
self.tokens -= cost
return 0.0
# 2. 令牌不足:计算等待时间并预支
# 为当前请求"预定"了未来的令牌,确保 FIFO 顺序
deficit = cost - self.tokens
wait_seconds = deficit / self.refill_rate
self.tokens -= cost
return wait_seconds
def _refill(self):
"""根据流逝时间补充令牌"""
now = time.monotonic()
elapsed = now - self.last_refill
if elapsed > 0:
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
客户端逻辑
import asyncio
import openai
from openai import AsyncOpenAI
from tenacity import retry, wait_random_exponential, stop_after_attempt, retry_if_exception_type
class RateLimitedClient:
def __init__(
self,
api_key: str,
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
rpm_limit: float = 600.0,
max_concurrency: int = 20
):
self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
# 组件 1: RPM 令牌桶 (控制总量)
self.rpm_bucket = TokenBucket(
quota_per_minute=rpm_limit,
initial_tokens=rpm_limit # 满桶启动,适合轻量在线服务
)
# 组件 2: 并发信号量 (控制瞬间并发)
self.semaphore = asyncio.Semaphore(max_concurrency)
async def _execute_request(self, model, messages, max_tokens):
"""执行单个请求:依次通过 RPM 检查和并发限制。"""
# 1. RPM 检查 (先拿令牌)
wait_seconds = self.rpm_bucket.reserve(1.0)
if wait_seconds > 0:
await asyncio.sleep(wait_seconds)
# 2. 并发检查 (再拿信号量)
async with self.semaphore:
# 3. 发起 API 调用
return await self.client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens
)
@retry(
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(5),
retry=retry_if_exception_type((
openai.RateLimitError,
openai.InternalServerError,
openai.APIConnectionError
))
)
async def chat_with_limit(self, model, messages, max_tokens=1024):
# 设计考量:为什么重试也要重新拿 Token?
# 答:为了安全。如果不重新拿,重试带来的流量脉冲
# 可能会瞬间突破 RPM 限制
return await self._execute_request(model, messages, max_tokens)
流量整形策略
在 RAG 实时入库、长文档批量分析等追求高稳吞吐的批量处理场景中,请求速率限制策略存在明显的 TPM 盲区。为此,流量整形策略升级为双重资源感知(RPM & TPM),并在发送端引入整形机制,将突发脉冲流量“削峰填谷”,转为平滑流速。
该策略在原有请求速率限制基础上,增强了以下能力:
双重资源管控(RPM & TPM):同时维护 RPM 和 TPM 令牌桶,所有请求在发出前必须通过两个维度的配额检查。
输入事前预扣,输出事后结算:模型输出长度在请求前未知。TPM 令牌桶在发送时仅预扣输入 Token,请求完成后结算实际输出 Token。即使结算时额度不足(令牌为负),后续请求也会等待令牌回正,自然平滑流速。
匀速预热:冷启动期间,令牌发放速率随时间线性增长,消除初始突发风险。
平滑限速:通过强制请求间保持最小间隔(Pacing),平滑发送速率,降低触发速率限制的风险。
备选方案参考:若业务对启动瞬间的微小排队延迟不敏感,可复用标准令牌桶逻辑(设 initial_tokens=0),实现安全启动,同时降低客户端复杂度。此外,本文的 Python 令牌桶实现仅用于演示设计思路,生产环境中建议使用各语言生态成熟的限流组件(如 Java 的 Guava SmoothRateLimiter)。
代码示例中将平滑等待置于并发锁内部,以避免队头阻塞引发的请求集中发送。多个请求可能在等待结束后同时竞争并发信号量,导致原本平滑的流量在出口处再次拥堵。虽会轻微降低并发效率,但能确保发送间隔精准可控。
完整的流量整形链路为:预估输入 Token → 双重准入(RPM & TPM)→ 并发锁 → 平滑整形 → 发送 → 输出 Token 结算。

该策略因采用保守的平滑机制,会牺牲部分理论最大并发度,不适用于极致低延迟的在线服务。
代码示例
进阶令牌桶
import time
class TokenBucket:
"""进阶令牌桶,支持匀速预热 (Continuous Warm-up) 机制。"""
def __init__(self, quota_per_minute: float, warmup_seconds: float = 0.0):
self.capacity = quota_per_minute
self.tokens = 0.0
self.target_refill_rate = quota_per_minute / 60.0
self.warmup_seconds = warmup_seconds
self.start_time = time.monotonic()
self.last_update_time = self.start_time
self.cumulative_generated = 0.0
def _get_cumulative_tokens(self, t: float) -> float:
if t <= 0:
return 0.0
R = self.target_refill_rate
T = self.warmup_seconds
if T <= 0:
return R * t
if t <= T:
return (R / (2 * T)) * (t ** 2)
else:
warmup_total = (R * T) / 2.0
return warmup_total + R * (t - T)
def _get_time_for_cumulative_tokens(self, target_cumulative: float) -> float:
if target_cumulative <= 0:
return 0.0
R = self.target_refill_rate
T = self.warmup_seconds
if T <= 0:
return target_cumulative / R
warmup_total = (R * T) / 2.0
if target_cumulative <= warmup_total:
return ((2 * T * target_cumulative) / R) ** 0.5
else:
return (target_cumulative - warmup_total) / R + T
def reserve(self, cost: float = 1.0) -> float:
now = time.monotonic()
relative_now = now - self.start_time
current_cumulative = self._get_cumulative_tokens(relative_now)
new_tokens = current_cumulative - self.cumulative_generated
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.cumulative_generated = current_cumulative
self.last_update_time = now
if self.tokens >= cost:
self.tokens -= cost
return 0.0
deficit = cost - self.tokens
self.tokens -= cost
target_cumulative = self.cumulative_generated + deficit
target_time = self._get_time_for_cumulative_tokens(target_cumulative)
wait_seconds = target_time - relative_now
return max(0.0, wait_seconds)
def adjust(self, amount: float):
self.tokens = min(self.capacity, self.tokens + amount)
平滑限流器
import time
class SmoothRateLimiter:
def __init__(self, rate_per_minute: float):
self._min_interval = 60.0 / rate_per_minute
self._last_operation = time.monotonic()
def reserve(self) -> float:
now = time.monotonic()
elapsed = now - self._last_operation
wait_time = max(0.0, self._min_interval - elapsed)
self._last_operation = now + wait_time
return wait_time
客户端逻辑
import asyncio
class TrafficShapingClient:
def __init__(self):
self._rpm_bucket = TokenBucket(quota_per_minute=600)
self._tpm_bucket = TokenBucket(quota_per_minute=1_000_000)
self._smooth_limiter = SmoothRateLimiter(rate_per_minute=600)
self._concurrency_semaphore = asyncio.Semaphore(20)
async def _execute_throttled_request(self, model, prompt, max_tokens, input_tokens):
# [步骤 1] 双重准入控制 (Parallel Admission)
# 同时检查 RPM 和 TPM,取两者中较长的等待时间
wait_rpm = self._rpm_bucket.reserve(1.0)
# TPM 检查仅针对输入 Token 申请额度
wait_tpm = self._tpm_bucket.reserve(input_tokens)
admission_wait = max(wait_rpm, wait_tpm)
if admission_wait > 0:
await asyncio.sleep(admission_wait)
# [步骤 2] 获取并发锁 (Concurrency Lock)
async with self._concurrency_semaphore:
# [步骤 3] 流量整形 (Traffic Shaping)
# 关键:在锁内进行平滑等待
# 牺牲部分并发效率,换取发送间隔的精准可控
smooth_wait = self._smooth_limiter.reserve()
if smooth_wait > 0:
await asyncio.sleep(smooth_wait)
# [步骤 4] 发送请求
content, actual_usage = await self._send_chat_request(model, prompt, max_tokens)
# [步骤 5] 输出 Token 结算
output_tokens = actual_usage.completion_tokens
if output_tokens > 0:
self._tpm_bucket.adjust(-output_tokens)
return content
自适应拥塞控制策略
适用于 API 网关、复杂代理、多租户等大规模动态混合负载场景。
说明 选型提示:该策略并非通用方案
自适应拥塞控制策略的核心价值在于应对高度不确定与剧烈波动的业务环境,并非普适选择:
性能悖论:若业务负载可预测、较稳定(如定量批处理),基于经验直接设定最优静态参数,性能通常优于需要"试探与收敛"的动态探测。
探测损耗:动态算法为了寻找边界,必然伴随冷启动爬坡与试探性波动。在可知场景下,这种"探索成本"反而是不必要的性能损耗。
维护成本:引入闭环反馈机制,显著增加了系统的复杂度与排查难度。
除非业务规模极大、负载复杂且波动显著,否则优先选择更简单的前三种策略。
请求速率限制策略和流量整形策略是基于静态配额的经典防御策略,在负载稳定、可预测的场景下完全适用。然而,在网关级的复杂场景下,业务面临来自两方面的动态变化:下游负载复杂多变(高并发短请求与长耗时深度推理任务交织);平台限流阈值动态波动(秒级速率限制和增速判定阈值会根据服务状态调整)。静态策略难以兼顾效率与稳定性。
该策略借鉴 BBR(Bottleneck Bandwidth and RTT) ,建立了基于 EBP(Elastic Bandwidth Probing) 的闭环控制系统。它将 RPM/TPM 配额视为指导上限,根据实时反馈(延迟变化、是否限流)动态计算最佳发送速率,最大化吞吐量。
弹性探测(EBP):记忆历史最高成功水位,根据当前并发度与最高水位的距离模拟弹簧张力计算探测增益(距离越远加速,越近减速)。叠加微小线性推力确保在高饱和区间仍能持续探索边界。
TPT 拥塞感知:大模型生成耗时与长度成正比,长文本延迟高不代表拥塞。使用 TPT(Time Per Token,单 Token 处理耗时)作为指标,滤除内容长度的噪声。只有当 TPT 显著恶化时才判定为计算饱和。
防突发调速器:无论 EBP 计算出的目标并发度多高,调速器都会强制限制并发增长的加速度,确保流量呈平滑上升态势,避免阶梯跳变触发增速限制。

相较于原生 BBR,该策略针对大模型特性进行了以下关键改造:
指导性探测:引入已知的 RPM/TPM 配额作为"指导上限",避免盲目试探导致的频繁撞墙。
信号源改造(RTT → TPT):原生 BBR 依赖 RTT(往返时延),但大模型场景中内容长度带来的延迟差异远大于网络抖动,改用 TPT 剔除内容长度的干扰。
响应机制强化(ProbeRTT → Hold):面对延迟波动,选择保持当前并发水平,而非主动退避降低吞吐。
硬限流响应(Packet Loss → 429 Drain):一旦触发 429 错误,进入激进的 Drain 状态,冷却期结束后执行快速恢复。
该策略存在以下局限:
拥塞信号噪点(TPT Noise):当前 TPT 按"总延迟 / 总 Token 数"粗略估算。总延迟混入了网络往返、排队与首字生成耗时,容易受网络抖动或长输入干扰而虚高,从而误触发 Hold 状态。
大请求饥饿(Starvation Risk):为追求极致调度性能,该策略使用了非严格 FIFO 的唤醒机制。在配额紧缺时,短 Token 请求可能"插队"抢占资源,导致长 Token 请求排队等待时间过长。
冷启动问题(Cold Start):该策略需要预热时间建立统计模型,低负载或短时任务中因从零探测,吞吐量可能低于前三种策略。
代码示例
控制入口
class ElasticCongestionController:
async def acquire(self):
"""[准入阶段] 请求发起前的检查"""
# 1. SSR 慢启动重启:若空闲太久,主动衰减上限
# 防止过时水位导致的突发流量
if self.is_idle_too_long():
self.perform_slow_start_restart()
# 2. 熔断检查:若处于 DRAIN (冷却) 状态,强制等待
if self.state == CongestionState.DRAIN:
await self.wait_for_cooldown()
# 3. 双重预算检查:同时检查并发槽位和 Token 预算
await self.wait_for_budget(request_tokens)
async def release(self, latency, actual_tokens, error):
"""[反馈阶段] 请求结束后的决策"""
if error:
# [故障响应] 遇限流错误 (429/503):立即排水 + 乘性回退
self.state = CongestionState.DRAIN
self.concurrency_limit *= self.backoff_factor # e.g. 0.7
return
# [正常响应] 计算 TPT (Time-Per-Token)
current_tpt = latency / actual_tokens
# [拥塞感知] TPT 突增 (生成变慢):进入 HOLD 观察
# 维持并发水平,不退避也不增长
if current_tpt > self.metrics.ema_tpt * 2.0:
self.state = CongestionState.HOLD
else:
# [稳态探测] 网络健康:执行 EBP 弹性探测
self.state = CongestionState.PROBING
self.update_limit_via_ebp()
EBP 探测
def probe_next_limit(self, current_limit, max_known_capacity):
"""
计算下一个并发上限
核心公式:Next = Max(弹簧张力, 线性推力) + 调速器平滑
"""
# 1. 计算物理上限 (Little's Law)
# 理论上限 = 吞吐量 * 延迟 * 缓冲因子
dynamic_ceiling = self.metrics.tps * self.metrics.avg_latency * 1.2
# 2. 弹簧逻辑 (Spring Tension)
# 距离历史最高水位越远,张力越大(加速);越近则越小(减速)
tension = 1.0 - (current_limit / max_known_capacity)
spring_target = current_limit * (1.0 + tension * gain)
# 3. 线性推力 (Additive Thrust)
# 解决"芝诺悖论":张力趋近于 0 时,强制叠加微小线性增量
# 确保系统能突破局部极值,持续探索边界
linear_target = current_limit + self.min_additive_step
raw_target = max(spring_target, linear_target)
# 4. 防突发调速器 (Rate Governor)
# 限制并发增长的加速度,防止阶梯跳变
final_limit = self.governor.smooth(raw_target)
return min(final_limit, dynamic_ceiling)
统计追踪
class CongestionMetrics:
def update_stats(self, latency, token_count):
"""
[传感器] 实时更新统计指标
使用 EMA (指数移动平均) 滤除长尾请求的噪声
"""
alpha = 0.2 # 平滑因子
# 1. 估算单请求大小 (Token Size)
self.ema_tokens = (1 - alpha) * self.ema_tokens + alpha * token_count
# 2. 估算 TPT (Time Per Token)
# 用 TPT 代替 Latency,消除 LLM 生成长度不同带来的误差
instant_tpt = latency / token_count
self.ema_tpt = (1 - alpha) * self.ema_tpt + alpha * instant_tpt
def track_inflight(self, estimated_tokens):
"""
[盲区填充] 修正"响应后才计数"的滞后性
请求发起瞬间,立即预扣额度
"""
self.inflight_tokens += estimated_tokens
架构兜底方案
当平台配置和客户端流控仍无法满足业务对可用性或峰值吞吐的要求时,可在系统架构层面引入兜底机制。
模型降级(Fallback)
当主模型因限流或服务异常无法响应时,自动回退至配额宽裕的备选模型,保障主流程持续响应。
降级链路设计原则
选择不同系列的模型:百炼的限流按模型独立计算。模型限流时可选择不同模型作为备选,例如由 qwen3.6-plus 降级至 qwen3.6-flash。
仅在限流错误时触发降级:降级应针对 429 限流错误,而非所有异常。网络超时或参数错误等问题切换模型也无法解决。
备选模型需提前验证:确保备选模型支持业务所需的功能(如 Function Calling、结构化输出等),避免降级后功能异常。
代码示例
以下示例演示了基于 429 错误码的模型降级逻辑:主模型请求触发限流时,自动切换至备选模型重试。
import os
import asyncio
from openai import AsyncOpenAI, APIStatusError
# 主模型与备选模型(不同系列,独立配额)
PRIMARY_MODEL = "qwen3.6-plus"
FALLBACK_MODEL = "qwen3.6-flash"
client = AsyncOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
async def chat_with_fallback(messages: list) -> str:
"""带降级的请求:主模型限流时自动切换备选模型。"""
for model in [PRIMARY_MODEL, FALLBACK_MODEL]:
try:
response = await client.chat.completions.create(
model=model,
messages=messages
)
return response.choices[0].message.content
except APIStatusError as e:
if e.status_code == 429 and model == PRIMARY_MODEL:
print(f"[限流触发] {model},降级至 {FALLBACK_MODEL}")
continue
raise
raise RuntimeError("所有模型均不可用")
async def main():
result = await chat_with_fallback(
messages=[{"role": "user", "content": "你好"}]
)
print(result)
if __name__ == "__main__":
asyncio.run(main())
模型降级可与客户端流控策略组合使用。例如,在请求速率限制策略的重试逻辑中集成降级判断:当重试次数耗尽仍触发限流时,切换至备选模型。
基于消息队列(MQ)的削峰填谷
对于不要求即时响应的后端业务,可引入消息中间件(如 RabbitMQ、Kafka)进行削峰。突发流量先写入 MQ,消费端按限流配额匀速拉取处理。该架构解耦了前端峰值与后端调用,可从根本上避免限流报错。
适用场景:用户提交任务后可接受异步通知结果的业务,如工单处理、内容审核、批量数据标注等。MQ 作为缓冲层,吸收前端的流量尖峰,消费端以稳定速率向百炼 API 发送请求。
架构设计要点:
消费速率控制:消费端应配合请求速率限制或流量整形策略,按 RPM/TPM 配额匀速消费,而非无限制地拉取消息。
死信处理:对于多次重试仍失败的消息,应转入死信队列并触发告警,避免消息无限重试导致消费阻塞。
背压传递:当 MQ 积压超过阈值时,应向上游反馈压力(如返回排队状态),避免队列无限增长。
生产环境注意事项
上述示例代码基于 Python asyncio 单线程循环,用于演示核心算法。应用于大规模生产前,建议关注以下问题。
非文本模型的适配
上述策略以文本模型为例,但核心控制思想同样适用于多模态模型服务(如图像生成、语音合成)。除计量单位不同外,本质均为对提交速率和处理容量的限制:
无论限流指标如何变化,客户端主动流控的原则不变。只需将计数器(如 RPM 令牌桶)或探测指标(如 TPT)替换为对应模态的指标。具体限流规则与指标定义参见模型限流条件中对应模型的说明。
并发模型的原子性
示例实现:由于 asyncio 采用单线程协作式调度,示例代码中的状态修改操作具备天然的原子性,在单进程内无需额外的并发保护。
生产建议:在多线程或多进程环境中实现时,需注意令牌桶及统计窗口的并发安全,确保状态更新的正确性,否则会因竞态条件(Race Condition)导致流控失效。
分布式限流
示例实现:示例代码中的流控组件均为本地内存(In-Memory)实现。
生产建议:多实例分布式部署中,各实例独立进行本地流控,实际总用量可能超标并触发全局限流。建议使用中心化计数器(如 Redis)统一管控全节点用量。
优先级队列与饥饿预防
示例实现:所有示例代码皆未实现优先级区分。尤其自适应拥塞控制策略为追求调度性能而采用了非严格 FIFO 的唤醒机制。
生产建议:当业务存在高低优先级请求时,建议实现加权优先级队列(Weighted Priority Queue),保障高优请求的带宽。同时需引入防饥饿机制,为低优先级队列保留最小配额,防止其因持续高负载而完全无法调度。