实例限流最佳实践
云消息队列 RabbitMQ 版会对单实例的TPS流量峰值进行限流,本文介绍云消息队列 RabbitMQ 版实例的限流规则、限流后的行为以及限流最佳实践等。
限流阈值
实例总TPS限流阈值
| 实例系列 | Serverless系列实例 | 预付费系列实例 | ||||||
| 规格 | 共享 | 独享 | 未开启弹性TPS | 开启弹性TPS | ||||
| 预留+弹性/按累积量 | 预留+弹性 | 企业版 | 铂金版 | 专业版 | 企业版 | 铂金版 | 专业版 | |
| 限流阈值 | 最大5万次/秒 | 基础TPS流量峰值规格的2倍 | 基础TPS流量峰值规格 | 基础TPS流量峰值规格的2倍,最大5万次/秒 | 基础TPS流量峰值规格的2倍,最大5万次/秒 | 基础TPS流量峰值规格的1.5倍 | ||
单节点SendMessage TPS限流阈值
服务端会在实例维度限制每个后台服务节点上SendMessage的TPS值,限流阈值如下所示:
| 限制 | Serverless系列实例 | 预付费系列实例 | ||||
| 共享 | 独享 | 企业版 | 铂金版 | 专业版 | ||
| 按累积量 | 预留+弹性 | 预留+弹性 | ||||
| 限流阈值 | 2.5万次/秒 | 2.5万次/秒 | 无 | 2.5万次/秒 | 无 | 2.5万次/秒 | 
单接口的限流阈值
| 限制项 | 限制项接口 | Serverless系列实例 | 预付费系列实例 | |||
| 共享 | 独享 | 企业版 | 铂金版 | 专业版 | ||
| 预留+弹性/按累积量 | 预留+弹性 | |||||
| 单实例同步获取消息 | 
 | 500 TPS | 无 | 500 TPS | ||
| 单实例清Queue | 
 | 500 TPS | 无 | 500 TPS | ||
| 单实例创建Exchange | 
 | 500 TPS | 无 | 500 TPS | ||
| 单实例删除Exchange | 
 | 500 TPS | 无 | 500 TPS | ||
| 单实例创建Queue | 
 | 500 TPS | 无 | 500 TPS | ||
| 单实例删除Queue | 
 | 500 TPS | 无 | 500 TPS | ||
| 单实例创建Binding | 
 | 500 TPS | 无 | 500 TPS | ||
| 单实例删除Binding | 
 | 500 TPS | 无 | 500 TPS | ||
| 单实例恢复消息 | 
 | 500 TPS | 无 | 500 TPS | ||
| 单实例重入Queue消息 | 
 | 20 TPS | 无 | 20 TPS | ||
限流规则
当云消息队列 RabbitMQ 版实例的TPS流量峰值超过您所购买实例的TPS规格上限时,云消息队列 RabbitMQ 版实例会被限流。
限流后的行为如下:
错误码信息
- 错误码:reply-code=530 
- 错误信息:reply-text=denied for too many requests 
错误码处理示例代码
以Java语言为例,代码如下所示:
private static final int MAX_RETRIES = 5; // 最大重试次数
private static final long WAIT_TIME_MS = 2000; // 每次重试的等待时间(以毫秒为单位)
private void doAnythingWithReopenChannels(Connection connection, Channel channel) {
    try {
        // ......
        // 在当前通道channel下执行的任何操作
        // 例如消息发送、消费等
        // ......
    } catch (AlreadyClosedException e) {
        String message = e.getMessage();
        if (isChannelClosed(message)) {
            // 如果通道已经关闭,关闭并重新创建通道
            channel = createChannelWithRetry(connection); 
            // 在重连后可以继续执行其它操作
            // ......
        } else {
            throw e;
        }
    }
}
private Channel createChannelWithRetry(Connection connection) {
    for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
        try {
            return connection.createChannel();
        } catch (Exception e) {
            System.err.println("Failed to create channel. Attempt " + attempt + " of " + MAX_RETRIES);
            // 检查错误, 若仍是被限流导致的关闭错误,则可以等待后继续重试
            // 也可移除本部分重试逻辑
            if (attempt < MAX_RETRIES) {
                try {
                    Thread.sleep(WAIT_TIME_MS);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // 还原中断状态
                }
            } else {
                throw new RuntimeException("Exceeded maximum retries to create channel", e);
            }
        }
    }
    throw new RuntimeException("This line should never be reached"); // 理论上不会到达这里
}
private boolean isChannelClosed(String errorMsg) {
    // 判断是否包含channel.close报错,该报错代表通道已关闭。
    // 可能涵盖530,541等错误信息。
    if (errorMsg != null && errorMsg.contains("channel.close")) {
        System.out.println("[ChannelClosed] Error details: " + errorMsg);
        return true;
    }
    return false;
}实例秒级TPS峰值查询
通过查询实例实际使用的秒级TPS峰值,您可以了解业务的流量波动情况和流量峰值,判断实例规格是否满足业务需求。
云消息队列 RabbitMQ 版提供以下三种方式查询实例的秒级TPS峰值:
| 查询方式 | 说明 | 查询时间级别 | 查询资源级别 | 
| 优势: 
 | 分钟级TPS峰值 取值为1分钟周期内,每秒钟实例TPS的最大值。 | 实例级别TPS峰值 | |
| (推荐)通过实例详情查询实例TPS峰值 | 
 | 秒级TPS峰值 | 
 | 
| 
 | 秒级TPS峰值 | 实例级别TPS峰值 | 
TPS被限流后怎么处理?
如果出现因TPS峰值设置不合理导致实例、Connection被限流,从而影响到您的业务,建议您按照以下解决办法处理。
单实例总TPS被限流的解决办法
- 如果在测试或者流量峰值不确定、流量较少的短期场景,建议您为预付费系列实例开启弹性TPS能力或者使用Serverless系列实例。更多信息,请参见为实例开启弹性TPS功能。 
- 如果在长期稳定、流量较高的业务运行场景,建议您升级TPS流量峰值规格。更多信息,请参见升级实例配置。 
单节点的TPS被限流的解决办法
- 云消息队列 RabbitMQ 版采用分布式集群架构。建议为每个队列创建多个连接(至少10个),以便客户端能够更均衡地连接到集群中的多个服务节点。这种方法可以有效避免出现负载热点问题,从而提高消息的发送和消费效率。 
- Spring用户推荐使用 - CachingConnectionFactory的CONNECTION模式,详情请参见Spring集成。