Best practices for handling rate limiting

更新时间:
复制 MD 格式

Model Studio APIs limit request volume, token usage, and growth rate. Apply these strategies to maximize throughput and maintain availability.

Model Studio APIs limit requests, token usage, and growth rate over time — this is rate limiting. Large language models have long latency and are constrained on two dimensions (request count and token volume). Simple "retry on error" strategies fail here; you need purpose-built traffic control.

Three solution types, ordered from lowest to highest implementation cost:

If you are troubleshooting a 429 error now, go to Error diagnosis and strategy recommendations to identify the cause. For traffic burst errors, try server-side queuing first — it only requires one request header.

Platform rate limiting mechanism

The platform rate-limits each model independently at the root account level. After triggering, the service typically resumes within one minute. For rate limiting conditions and current usage per model, see Rate limiting and Model monitoring. Three types of rate limiting rules apply:

  • Minute-level quota limits (RPM / TPM): The maximum number of requests per minute (RPM) and the maximum token usage per minute (TPM).

  • Instantaneous frequency limits (RPS / TPS): Maximum requests per second (RPS) and maximum token usage per second (TPS). Dense API calls or token consumption within a single second can trigger rate limiting.

  • Growth rate limits (Traffic Burst): A sudden surge in request volume or token usage triggers rate limiting. The threshold adjusts dynamically. Ramp up requests gradually to avoid triggering this limit.

The following sections cover solutions at three levels: platform configuration, client-side traffic control, and architectural fallback.

Error diagnosis and strategy recommendations

The same error code can be triggered by different rate limiting dimensions. High concurrency can also saturate the server, causing timeouts. The adaptive congestion control strategy can help mitigate this.

Error code (DashScope / OpenAI)

Triggering dimension

Feature diagnosis

Recommended strategy

Throttling.RateQuota / limit_requests

Request rate exceeded
(RPM exceeded)

Intermittent errors. Success rate decreases over time.

Token bucket: Control the request quota per unit of time.

Request rate exceeded
(RPS exceeded)

Concentrated errors at startup or during concurrency spikes.

Concurrency semaphore or smoothing rate limiter: Increase the interval between requests.

Throttling.AllocationQuota / insufficient_quota

Token usage exceeded
(TPM exceeded)

Intermittent errors when processing long texts.

Dual token bucket: Limit both RPM and TPM quotas simultaneously.

Token usage exceeded
(TPS exceeded)

Instantaneous token consumption is too high during concurrent processing of long texts.

Concurrency semaphore or smoothing rate limiter.

Throttling.BurstRate / limit_burst_rate

Traffic growth rate exceeded
(Traffic Burst)

Sudden large volume of requests after startup or recovery from an idle state.

We recommend trying server-side queuing first. Alternatively, use a token bucket with a low initial value, such as initial_tokens=0, to implement a slow start, or use a smoothing rate limiter for peak-load shifting.

Platform configuration solutions

These solutions rely on platform capabilities and require minimal or no client-side code changes.

Server-side queuing (recommended)

For traffic burst rate limiting, Model Studio accepts a maximum wait time in the request header. The server queues and retries the request within that time until it starts processing or the queue times out. This significantly improves success rates during traffic bursts compared to an immediate 429 response.

Note

This feature only applies to growth rate / traffic burst rate limiting (Throttling.BurstRate). It does not apply to absolute quota rate limiting (RPM/TPM).

Configuration

Add the X-DashScope-Wait-Timeout field to the request header:

Header field

Example

Description

X-DashScope-Wait-Timeout

30

The maximum queuing wait time for burst requests, in seconds.

  • Value 0: No queuing. A 429 error is returned immediately.

  • Recommended range: 3 to 120 seconds.

Timeout configuration

After enabling queuing, adjust the client timeout to account for the added wait time:

  • Non-streaming requests (stream: false): Timeout = original base timeout + Wait-Timeout value.

  • Streaming requests (stream: true): Timeout > Wait-Timeout value. Streaming requests start timing after the first chunk, so the initial response timeout just needs to exceed the queuing time.

Example: If the original base timeout is 120 seconds and Wait-Timeout is set to 30 seconds, the non-streaming request timeout should be set to 150 seconds.

Code example

OpenAI Python SDK

import os
from openai import OpenAI

client = OpenAI(
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    timeout=150.0,  # Original timeout 120s + queuing wait 30s
)

response = client.chat.completions.create(
    model="qwen-plus",
    messages=[{"role": "user", "content": "Hello"}],
    extra_headers={
        "X-DashScope-Wait-Timeout": "30"  # Maximum queuing wait: 30 seconds
    }
)
print(response.choices[0].message.content)

curl

curl -X POST "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions" \
  -H "Authorization: Bearer $DASHSCOPE_API_KEY" \
  -H "Content-Type: application/json" \
  -H "X-DashScope-Wait-Timeout: 30" \
  -d '{
    "model": "qwen-plus",
    "messages": [{"role": "user", "content": "Hello"}]
  }'

Increase quota limits

If the default quota is insufficient, increase the temporary rate limit quota in the Model Studio console. Changes take effect immediately. Available in China (Beijing) and Singapore regions.

Scenarios: Default RPM/TPM quota insufficient due to business growth, or temporary throughput increase needed for short-term events. See Rate limits.

Simple to configure. Evaluate this option before trying client-side strategies.

Provisioned throughput unit (PTU)

The PTU service provides dedicated, reserved computing power. It avoids contention in the public resource pool and is the preferred solution for real-time, high-throughput requirements.

Use PTU when your business has deterministic throughput requirements (such as SLA commitments) or when you want stable, high throughput without client-side traffic control.

PTUs are reserved resources billed continuously, even when not fully utilized. Evaluate required specifications based on actual peak load to avoid waste.

Asynchronous batch processing (Batch API)

For tasks without strict real-time requirements (data cleaning, batch analytics), use the Batch API to submit them for batch processing. Tasks run during off-peak hours, return results asynchronously, and are not subject to online rate limits.

Suitable for offline tasks that tolerate hours-to-days return time: data annotation, log analysis, batch summarization. Batch API costs are typically lower than real-time API calls.

Return time is not guaranteed. Not suitable for services requiring immediate responses. Retrieve results via polling or callback after submission.

Client-side traffic control strategies

When platform solutions (such as server-side queuing and quota increases) are not enough, add traffic control on the client side. The core principle: distribute requests evenly within a time window to avoid bursts. After a system start or long idle period, ramp up concurrency gradually.

Four strategies in order of increasing complexity. Each includes the previous one and adds to it:

  • Basic retry provides only passive defense.

  • Request rate limiting adds active queuing.

  • Traffic shaping further introduces token-level control and smooth sending.

  • Adaptive congestion control dynamically adjusts the sending rate based on real-time feedback.

Choose the lowest-cost strategy that meets your needs.

Throughput performance comparison of each strategy

image

Throughput comparison under different loads:

  • Basic retry strategy: Effective under low load. Prone to congestive collapse under high concurrency, causing a sharp drop in throughput.

  • Request rate limiting strategy: Strong protection against collapse. However, under mixed workloads with long texts, throughput shows sawtooth-like fluctuations due to the lack of token control.

  • Traffic shaping strategy: High stability. Achieves smooth output by sacrificing some peak throughput.

  • Adaptive congestion control strategy: Can dynamically converge to a stable, high throughput point under high load, but has cold-start probing overhead.

Basic retry strategy

Suitable for personal testing, local scripts, and low-frequency background tasks. No rate limit on outgoing requests. Only triggers exponential backoff retry with random jitter on 429 or 5xx errors.

No proactive traffic control. Under multi-threaded concurrency, this easily triggers rate limiting and causes request backups.

Code example

Using the tenacity library

import openai
from openai import OpenAI
from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
    retry_if_exception_type
)

RETRYABLE_ERRORS = (
    openai.RateLimitError,
    openai.InternalServerError,
    openai.APIConnectionError,
)

@retry(
    wait=wait_random_exponential(min=1, max=60),
    stop=stop_after_attempt(6),
    retry=retry_if_exception_type(RETRYABLE_ERRORS)
)
def chat_with_retry(client, model, messages, max_tokens):
    return client.chat.completions.create(
        model=model,
        max_tokens=max_tokens,
        messages=messages
    )

client = OpenAI(
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    api_key="YOUR_DASHSCOPE_API_KEY"
)

try:
    response = chat_with_retry(
        client=client,
        model="qwen-plus",
        messages=[{"role": "user", "content": "What is exponential backoff retry?"}],
        max_tokens=1024
    )
    print(response.choices[0].message.content)
except Exception as e:
    print(f"Request failed: {e}")

Native implementation (no dependencies)

import time
import random
import openai
from openai import OpenAI

RETRYABLE_ERRORS = (
    openai.RateLimitError,
    openai.InternalServerError,
    openai.APIConnectionError,
)

def chat_with_retry(client, model, messages, max_tokens):
    attempt = 0
    max_retries = 5
    base_delay = 1
    max_delay = 60

    while attempt <= max_retries:
        try:
            return client.chat.completions.create(
                model=model,
                max_tokens=max_tokens,
                messages=messages
            )
        except RETRYABLE_ERRORS as e:
            attempt += 1
            if attempt > max_retries:
                raise e
            backoff = min(max_delay, base_delay * (2 ** (attempt - 1)))
            sleep_time = backoff + random.uniform(0, 1)
            print(f"Triggered {type(e).__name__}, retrying after {sleep_time:.2f}s...")
            time.sleep(sleep_time)

client = OpenAI(
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
    api_key="YOUR_DASHSCOPE_API_KEY"
)

try:
    response = chat_with_retry(
        client=client,
        model="qwen-plus",
        messages=[{"role": "user", "content": "What is exponential backoff retry?"}],
        max_tokens=1024
    )
    print(response.choices[0].message.content)
except Exception as e:
    print(f"Request failed: {e}")

The code uses exponential backoff, not fixed-interval retry. Fixed-interval retry causes all failed requests to re-send simultaneously, triggering rate limiting again. Exponential backoff with random jitter spreads retries out:

  • Wait time doubles progressively: For example, 1s, 2s, 4s.... This avoids repeated requests in a short period.

  • Add random jitter: A random value (such as 2s +/- 0.5s) spreads retry traffic, preventing a secondary flood (thundering herd effect).

The system recovers gradually instead of getting stuck in a "fail — retry in unison — fail again" loop.

Request rate limiting strategy

Passive retries alone cannot handle real traffic. Frequent retries increase latency. This strategy introduces active traffic control: check and throttle requests before sending them, organizing unordered traffic into a queue that respects the RPM limit. Active smoothing adds a small, predictable queuing delay — far less than the cost of a "error — wait — retry" loop. A small known cost beats a large unknown one.

Suitable for chatbots and other lightweight, request-response services sensitive to time to first token.

Client-side active queuing uses two levels of control:

  • RPM token bucket: Limits total requests per minute. Bucket capacity equals the RPM quota; tokens refill at a constant rate. Supports borrowing: if tokens are insufficient, a request borrows from future quota, strictly FIFO.

  • Concurrency semaphore: Limits concurrent requests to prevent instantaneous high concurrency from triggering RPS limits.

Execute these two levels in strict order: acquire RPM token first, then acquire semaphore. Concurrency slots are scarce — only allocate them to requests that are ready to execute. Reversing the order causes head-of-line blocking under high load: requests hold slots but have no tokens, all slots fill up, nothing sends. Principle: do not hold a scarce resource while waiting.

The code below initializes the token bucket to full (initial_tokens=rpm_limit), suitable for online services that need to process requests immediately at startup. If a full bucket triggers rate limiting, lower the initial value (for example, initial_tokens=0 for an "empty bucket start") to ramp up more gradually.

This strategy does not track token usage. Long-text tasks can still exhaust the TPM quota.

Code example

Core component: Token bucket

import time

class TokenBucket:
    """
    Token bucket implementation to control requests per minute (RPM).
    Supports a debt mechanism to ensure first-in, first-out (FIFO) order under high concurrency.
    """
    def __init__(self, quota_per_minute: float, initial_tokens: float = 0.0):
        self.capacity = quota_per_minute
        self.tokens = initial_tokens
        self.refill_rate = quota_per_minute / 60.0
        self.last_refill = time.monotonic()

    def reserve(self, cost: float = 1.0) -> float:
        """
        Acquires a token.
        If tokens are insufficient, returns the number of seconds to wait (supports debt).
        """
        self._refill()

        # 1. Sufficient tokens: Deduct directly
        if self.tokens >= cost:
            self.tokens -= cost
            return 0.0

        # 2. Insufficient tokens: Calculate wait time and incur debt
        # "Reserves" future tokens for the current request to ensure FIFO order
        deficit = cost - self.tokens
        wait_seconds = deficit / self.refill_rate
        self.tokens -= cost
        return wait_seconds

    def _refill(self):
        """Refills tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self.last_refill
        if elapsed > 0:
            self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
            self.last_refill = now

Client logic

import asyncio
import openai
from openai import AsyncOpenAI
from tenacity import retry, wait_random_exponential, stop_after_attempt, retry_if_exception_type

class RateLimitedClient:
    def __init__(
        self,
        api_key: str,
        base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
        rpm_limit: float = 600.0,
        max_concurrency: int = 20
    ):
        self.client = AsyncOpenAI(api_key=api_key, base_url=base_url)
        # Component 1: RPM token bucket (controls total volume)
        self.rpm_bucket = TokenBucket(
            quota_per_minute=rpm_limit,
            initial_tokens=rpm_limit  # Start with a full bucket, suitable for lightweight online services
        )
        # Component 2: Concurrency semaphore (controls instantaneous concurrency)
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def _execute_request(self, model, messages, max_tokens):
        """Executes a single request, passing through RPM check and concurrency limit in order."""
        # 1. RPM check (acquire token first)
        wait_seconds = self.rpm_bucket.reserve(1.0)
        if wait_seconds > 0:
            await asyncio.sleep(wait_seconds)
        # 2. Concurrency check (acquire semaphore next)
        async with self.semaphore:
            # 3. Make the API call
            return await self.client.chat.completions.create(
                model=model,
                messages=messages,
                max_tokens=max_tokens
            )

    @retry(
        wait=wait_random_exponential(min=1, max=60),
        stop=stop_after_attempt(5),
        retry=retry_if_exception_type((
            openai.RateLimitError,
            openai.InternalServerError,
            openai.APIConnectionError
        ))
    )
    async def chat_with_limit(self, model, messages, max_tokens=1024):
        # Design consideration: Why do retries also need to re-acquire a token?
        # Ans: For safety. Without re-acquisition, the traffic pulse from retries
        # could instantly exceed the RPM limit.
        return await self._execute_request(model, messages, max_tokens)

Traffic shaping strategy

In batch scenarios requiring high, stable throughput (real-time RAG ingestion, bulk long-document analysis), request rate limiting has a TPM blind spot. Traffic shaping adds dual resource awareness (RPM & TPM) and a shaping mechanism that converts bursty traffic into a smooth flow.

Enhancements over request rate limiting:

  • Dual resource control (RPM & TPM): Maintains both RPM and TPM token buckets. All requests must pass quota checks for both dimensions before being sent.

  • Pre-deduction for input, post-settlement for output: Output length is unknown before the request. The TPM bucket pre-deducts input tokens when sending and settles actual output tokens after completion. Even if tokens go negative, subsequent requests wait for the count to become positive, naturally smoothing the flow.

  • Continuous warm-up: During a cold start, the token issuance rate increases linearly over time, eliminating the risk of initial bursts.

  • Smoothing rate limiter (Pacing): Smooths the sending rate by enforcing a minimum interval between requests (pacing), reducing the risk of triggering rate limits.

Alternative: If minor queuing delay at startup is acceptable, reuse the standard token bucket (set initial_tokens=0) for a safe start with lower complexity. The Python token bucket here is for demonstration. In production, use mature rate limiting libraries such as Guava's SmoothRateLimiter in Java.

In the code example, the smoothing wait is placed inside the concurrency lock. Multiple requests might compete for the semaphore simultaneously after their wait ends, re-bunching the traffic at the exit. Smoothing inside the lock slightly reduces concurrency efficiency but ensures precise send intervals.

The complete traffic shaping pipeline is: Estimate input tokens → Dual admission (RPM & TPM) → Concurrency lock → Traffic shaping → Send → Settle output tokens.

image

The conservative smoothing sacrifices some peak concurrency. Not suitable for services requiring extremely low latency.

Code example

Advanced token bucket

import time

class TokenBucket:
    """Advanced token bucket that supports a continuous warm-up mechanism."""
    def __init__(self, quota_per_minute: float, warmup_seconds: float = 0.0):
        self.capacity = quota_per_minute
        self.tokens = 0.0
        self.target_refill_rate = quota_per_minute / 60.0
        self.warmup_seconds = warmup_seconds
        self.start_time = time.monotonic()
        self.last_update_time = self.start_time
        self.cumulative_generated = 0.0

    def _get_cumulative_tokens(self, t: float) -> float:
        if t <= 0:
            return 0.0
        R = self.target_refill_rate
        T = self.warmup_seconds
        if T <= 0:
            return R * t
        if t <= T:
            return (R / (2 * T)) * (t ** 2)
        else:
            warmup_total = (R * T) / 2.0
            return warmup_total + R * (t - T)

    def _get_time_for_cumulative_tokens(self, target_cumulative: float) -> float:
        if target_cumulative <= 0:
            return 0.0
        R = self.target_refill_rate
        T = self.warmup_seconds
        if T <= 0:
            return target_cumulative / R
        warmup_total = (R * T) / 2.0
        if target_cumulative <= warmup_total:
            return ((2 * T * target_cumulative) / R) ** 0.5
        else:
            return (target_cumulative - warmup_total) / R + T

    def reserve(self, cost: float = 1.0) -> float:
        now = time.monotonic()
        relative_now = now - self.start_time
        current_cumulative = self._get_cumulative_tokens(relative_now)
        new_tokens = current_cumulative - self.cumulative_generated
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.cumulative_generated = current_cumulative
        self.last_update_time = now
        if self.tokens >= cost:
            self.tokens -= cost
            return 0.0
        deficit = cost - self.tokens
        self.tokens -= cost
        target_cumulative = self.cumulative_generated + deficit
        target_time = self._get_time_for_cumulative_tokens(target_cumulative)
        wait_seconds = target_time - relative_now
        return max(0.0, wait_seconds)

    def adjust(self, amount: float):
        self.tokens = min(self.capacity, self.tokens + amount)

Smoothing rate limiter

import time

class SmoothRateLimiter:
    def __init__(self, rate_per_minute: float):
        self._min_interval = 60.0 / rate_per_minute
        self._last_operation = time.monotonic()

    def reserve(self) -> float:
        now = time.monotonic()
        elapsed = now - self._last_operation
        wait_time = max(0.0, self._min_interval - elapsed)
        self._last_operation = now + wait_time
        return wait_time

Client logic

import asyncio

class TrafficShapingClient:
    def __init__(self):
        self._rpm_bucket = TokenBucket(quota_per_minute=600)
        self._tpm_bucket = TokenBucket(quota_per_minute=1_000_000)
        self._smooth_limiter = SmoothRateLimiter(rate_per_minute=600)
        self._concurrency_semaphore = asyncio.Semaphore(20)

    async def _execute_throttled_request(self, model, prompt, max_tokens, input_tokens):
        # [Step 1] Dual admission control
        # Check both RPM and TPM, and take the longer wait time
        wait_rpm = self._rpm_bucket.reserve(1.0)
        # The TPM check only requests a quota for input tokens
        wait_tpm = self._tpm_bucket.reserve(input_tokens)
        admission_wait = max(wait_rpm, wait_tpm)
        if admission_wait > 0:
            await asyncio.sleep(admission_wait)

        # [Step 2] Acquire concurrency lock
        async with self._concurrency_semaphore:
            # [Step 3] Traffic shaping
            # Key: Perform smoothing wait inside the lock
            # Sacrifices some concurrency efficiency for precise control over send intervals
            smooth_wait = self._smooth_limiter.reserve()
            if smooth_wait > 0:
                await asyncio.sleep(smooth_wait)

            # [Step 4] Send request
            content, actual_usage = await self._send_chat_request(model, prompt, max_tokens)

            # [Step 5] Settle output tokens
            output_tokens = actual_usage.completion_tokens
            if output_tokens > 0:
                self._tpm_bucket.adjust(-output_tokens)
            return content

Adaptive congestion control strategy

Suitable for large-scale, dynamic workloads: API gateways, complex proxies, multi-tenant systems.

Note

Selection tip: This strategy is not a universal solution

This strategy handles highly uncertain and volatile environments. It is not a universal choice:

  • Performance paradox: If the load is predictable (such as batch processing), optimal static parameters outperform dynamic probing that needs "trial and convergence".

  • Probing overhead: Dynamic algorithms inevitably involve cold-start ramp-up and exploratory fluctuations — unnecessary overhead in known scenarios.

  • Maintenance cost: Closed-loop feedback increases system complexity and troubleshooting difficulty.

Unless your business has a very large scale, complex load, and significant volatility, choose one of the simpler first three strategies.

Request rate limiting and traffic shaping are static quota strategies — they work well under stable, predictable loads. But at the gateway level, downstream loads shift constantly (short requests mixed with deep inference), and platform thresholds fluctuate dynamically. Static strategies struggle to balance efficiency and stability.

Inspired by BBR, this strategy builds a closed-loop control system based on EBP (Elastic Bandwidth Probing). It uses RPM/TPM quotas as a guiding upper limit and dynamically calculates the optimal send rate from real-time feedback (latency changes, rate limit signals).

  • EBP: Stores the historical highest successful watermark. Calculates probing gain by simulating spring tension based on the distance from current concurrency to the watermark (farther = faster, closer = slower). Adds a small linear thrust to keep exploring even at high saturation.

  • TPT congestion awareness: LLM generation time scales with output length — high latency on long texts does not mean congestion. TPT (Time Per Token) filters out content-length noise. Congestion is only declared when TPT degrades significantly.

  • Anti-burst rate governor: Regardless of the EBP target, the governor caps concurrency growth acceleration to ensure smooth ramp-up and avoid step changes that trigger growth rate limits.

image

Key modifications from native BBR for large models:

  • Guided probing: Introduces known RPM/TPM quotas as a "guiding upper limit" to avoid frequent collisions caused by blind probing.

  • Signal source (RTT → TPT): Native BBR uses RTT. In LLM scenarios, content-length latency dwarfs network jitter. TPT eliminates that interference.

  • Response mechanism enhancement (ProbeRTT → Hold): In the face of latency fluctuations, it chooses to maintain the current concurrency level rather than proactively backing off and reducing throughput.

  • Hard rate limit response (Packet Loss → 429 Drain): Once a 429 error is triggered, it enters an aggressive Drain state and performs a fast recovery after a cooldown period.

Limitations:

  • TPT noise: TPT is estimated as "total latency / total tokens". Total latency includes network round-trip, queuing, and time to first token — susceptible to jitter or long inputs, which can falsely trigger the Hold state.

  • Large request starvation: Uses a non-strict FIFO wakeup for scheduling performance. When quotas are scarce, short-token requests may preempt resources, causing long-token requests to wait too long.

  • Cold start: Requires a warm-up period to build a statistical model. In low-load or short-lived tasks, throughput may be lower than the first three strategies.

Code example

Control entrypoint

class ElasticCongestionController:
    async def acquire(self):
        """[Admission phase] Check before initiating a request"""
        # 1. SSR slow-start restart: If idle for too long, proactively decay the limit
        #    to prevent burst traffic caused by an outdated watermark.
        if self.is_idle_too_long():
            self.perform_slow_start_restart()

        # 2. Circuit breaker check: If in DRAIN (cooldown) state, force wait.
        if self.state == CongestionState.DRAIN:
            await self.wait_for_cooldown()

        # 3. Dual budget check: Check both concurrency slots and token budget.
        await self.wait_for_budget(request_tokens)

    async def release(self, latency, actual_tokens, error):
        """[Feedback phase] Decision after the request finishes"""
        if error:
            # [Fault response] On rate limit error (429/503): Immediately drain + multiplicative backoff
            self.state = CongestionState.DRAIN
            self.concurrency_limit *= self.backoff_factor  # e.g. 0.7
            return

        # [Normal response] Calculate TPT (Time-Per-Token)
        current_tpt = latency / actual_tokens

        # [Congestion awareness] TPT spike (generation slows down): Enter HOLD to observe
        # Maintain concurrency level, neither backing off nor increasing
        if current_tpt > self.metrics.ema_tpt * 2.0:
            self.state = CongestionState.HOLD
        else:
            # [Steady-state probing] Network is healthy: Perform EBP elastic probing
            self.state = CongestionState.PROBING
            self.update_limit_via_ebp()

EBP probing

def probe_next_limit(self, current_limit, max_known_capacity):
    """
    Calculate the next concurrency limit
    Core formula: Next = Max(Spring Tension, Additive Thrust) + Governor Smoothing
    """
    # 1. Calculate physical limit (Little's Law)
    # Theoretical limit = Throughput * Latency * Buffer Factor
    dynamic_ceiling = self.metrics.tps * self.metrics.avg_latency * 1.2

    # 2. Spring logic (Spring Tension)
    # The farther from the historical high watermark, the greater the tension (accelerate); the closer, the smaller (decelerate)
    tension = 1.0 - (current_limit / max_known_capacity)
    spring_target = current_limit * (1.0 + tension * gain)

    # 3. Additive thrust
    # Solves the "Zeno's Paradox": When tension approaches 0, forcibly add a small linear increment
    # to ensure the system can break out of local maxima and continue exploring the boundary.
    linear_target = current_limit + self.min_additive_step

    raw_target = max(spring_target, linear_target)

    # 4. Anti-burst rate governor
    # Limits the acceleration of concurrency growth to prevent step changes.
    final_limit = self.governor.smooth(raw_target)

    return min(final_limit, dynamic_ceiling)

Metrics tracking

class CongestionMetrics:
    def update_stats(self, latency, token_count):
        """
        [Sensor] Update statistical metrics in real time
        Use EMA (exponential moving average) to filter out noise from long-tail requests
        """
        alpha = 0.2  # Smoothing factor

        # 1. Estimate single request size (Token Size)
        self.ema_tokens = (1 - alpha) * self.ema_tokens + alpha * token_count

        # 2. Estimate TPT (Time Per Token)
        # Use TPT instead of Latency to eliminate errors caused by different LLM generation lengths
        instant_tpt = latency / token_count
        self.ema_tpt = (1 - alpha) * self.ema_tpt + alpha * instant_tpt

    def track_inflight(self, estimated_tokens):
        """
        [Blind spot filling] Correct the lag of "counting only after response"
        Pre-deduct the quota the moment a request is initiated
        """
        self.inflight_tokens += estimated_tokens

Architectural fallback solutions

When platform configuration and client-side traffic control still cannot meet availability or peak throughput requirements, add fallback mechanisms at the architecture level.

Model fallback

When the primary model cannot respond due to rate limiting or service issues, automatically fall back to an alternative model with a more generous quota.

Fallback path design principles

  • Choose models from different series: Rate limiting is per-model. Use a different model as fallback — for example, fall back from qwen3.6-plus to qwen3.6-flash.

  • Trigger fallback only on rate limit errors: Fall back on 429 errors, not all exceptions. Switching models will not fix network timeouts or parameter errors.

  • Validate the fallback model in advance: Ensure it supports required features (Function Calling, structured output, etc.) to avoid functional issues after fallback.

Code example

The following example demonstrates model fallback logic based on the 429 error code. When a request to the primary model triggers rate limiting, it automatically switches to the fallback model for a retry.

import os
import asyncio
from openai import AsyncOpenAI, APIStatusError

# Primary and fallback models (different series, independent quotas)
PRIMARY_MODEL = "qwen3.6-plus"
FALLBACK_MODEL = "qwen3.6-flash"

client = AsyncOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

async def chat_with_fallback(messages: list) -> str:
    """Request with fallback: Automatically switches to the fallback model when the primary model is rate-limited."""
    for model in [PRIMARY_MODEL, FALLBACK_MODEL]:
        try:
            response = await client.chat.completions.create(
                model=model,
                messages=messages
            )
            return response.choices[0].message.content
        except APIStatusError as e:
            if e.status_code == 429 and model == PRIMARY_MODEL:
                print(f"[Rate Limit Triggered] {model}, falling back to {FALLBACK_MODEL}")
                continue
            raise
    raise RuntimeError("All models are unavailable")

async def main():
    result = await chat_with_fallback(
        messages=[{"role": "user", "content": "Hello"}]
    )
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

Model fallback can be combined with client-side traffic control strategies. For example, you can integrate fallback logic into the retry mechanism of the request rate limiting strategy. When retries are exhausted and rate limiting is still triggered, switch to the fallback model.

Peak-load shifting using message queues (MQ)

For backend services that do not require immediate responses, introduce a message middleware (RabbitMQ, Kafka) for peak-load shifting. Burst traffic goes into the MQ first; consumers pull and process at a steady rate matching the rate limit quota. This decouples frontend peaks from backend calls.

Suitable for businesses where users accept asynchronous results: ticket processing, content moderation, batch data annotation.

Key design points:

  • Consumer rate control: The consumer side should use the request rate limiting or traffic shaping strategy to consume messages at a steady rate based on the RPM/TPM quota, rather than pulling messages without limits.

  • Dead-letter handling: Move messages that fail after multiple retries to a dead-letter queue and trigger an alert. Prevent infinite retries from blocking consumption.

  • Back-pressure propagation: When the MQ backlog exceeds a threshold, propagate pressure upstream (for example, return a queuing status) to prevent unbounded queue growth.

Production environment considerations

The code examples use a Python asyncio single-threaded loop to demonstrate core algorithms. Before large-scale production use, consider the following.

  • Adapting to non-text models

    The strategies above use text models as examples, but the core principles apply to multimodal services (image generation, speech synthesis). The units differ, but the essence is the same: limiting submission rate and processing capacity.

    • Models such as speech recognition are typically constrained by both the number of requests per unit of time (such as RPM) and usage (such as audio duration). The strategies are basically the same as for text models.

    • Models for images and videos are typically constrained by the task submission rate and the number of concurrent tasks. You can use the same approach as the request rate limiting strategy: limit the task submission rate and use a semaphore to control concurrency.

    Regardless of how metrics change, the principle of client-side throttling stays the same. Replace the counter or probing metric with the one for your modality. For specific rules, see Rate limiting.

  • Atomicity in concurrent models

    Example: asyncio uses single-threaded cooperative scheduling, so state modifications are inherently atomic within a single process.

    Production: In multi-threaded or multi-process environments, ensure concurrency safety of the token bucket and statistics window. Race conditions will break traffic control.

  • Distributed rate limiting

    Example: All traffic control components are in-memory.

    Production: In multi-instance deployments, each instance throttles independently. Total usage may exceed the limit. Use a centralized counter (such as Redis) to manage usage across all nodes.

  • Priority queues and starvation prevention

    Example: No priority differentiation. The adaptive congestion control strategy uses non-strict FIFO wakeup for scheduling performance.

    Production: For high/low priority requests, implement a weighted priority queue to guarantee bandwidth for high-priority traffic. Reserve a minimum quota for low-priority queues to prevent starvation.