AHAS 应用流控降级功能与消息队列 RocketMQ 组合,让系统负载保持在消息处理水位之下,同时尽可能地处理更多消息,达到“削峰填谷”的效果。本文以 AHAS 应用流控降级的匀速处理请求的能力为例,说明如何对 RocketMQ 消费端进行限流。
背景信息
在消息队列 RocketMQ 中,消费者消费消息时,很可能出现因消息发送量突增而消费者来不及处理的情况,导致消费方负载过高,进而导致影响系统稳定性。
在实际场景中,消息的到来具有瞬时性、不规律性,导致系统可能出现空闲资源。利用 AHAS 应用流控降级的匀速处理请求的能力,可以把超过消费端处理能力的消息(图中红色部分)均摊到后面系统空闲时去处理,让系统负载处在一个稳定的水位,同时尽可能地处理更多消息,起到削峰填谷的作用。

AHAS 应用流控降级在削峰填谷的场景时,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求。
例如,在 RocketMQ 的场景下,配置匀速模式下请求 QPS 为 5,则每 200 ms 处理一条消息,多余的处理任务将排队;同时配置超时时间为 5 秒,预计的排队时长超过 5 秒的处理任务将会被直接拒绝。

前提条件
步骤一:接入 AHAS 应用流控降级
下面将介绍如何快速在消息队列 RocketMQ Consumer (消费端)接入和使用 AHAS 流控降级服务 。您可以在下载Demo 工程来完成以下步骤。
-
在 Consumer 的 pom 文件中引入 AHAS 应用流控降级(即 Sentinel)依赖。
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>ahas-sentinel-client</artifactId> <version>x.y.z</version> </dependency>
说明 请在 AHAS 依赖仓库查看依赖最新版本。 - 定义资源。
由于消息队列 RocketMQ Consumer 未提供相应拦截机制,而且每次收到都可能是批量的消息,因此用户需要在处理消息时手动定义资源。
定义消息处理逻辑为消息被拒绝后会记录错误并触发重新投递,代码示例如下:
private static Action handleMessage(Message message, String groupId, String topic) { Entry entry = null; try { // 定义资源。为了便于标识,资源名称定义为 Group ID 和 Topic 的组合。Group ID 和 Topic 可以通过消息队列 RocketMQ 控制台获得。 entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic); // 业务真实的消息处理逻辑 System.out.println(System.currentTimeMillis() + " | handling message: " + message); return Action.CommitMessage; } catch (BlockException ex) { // 编写被流控的消息的处理逻辑。示例:记录错误或进行重试。 System.err.println("Blocked, will retry later: " + message); // 会触发消息重新投递 return Action.ReconsumeLater; } finally { if (entry != null) { entry.exit(); } } }
消费者订阅消息的逻辑示例如下:
Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, "*", (message, context) -> { return handleMessage(message); }); consumer.start();
关于消息队列 RocketMQ 如何订阅消息,请参考 消息队列 RocketMQ 文档。
- 登录 AHAS 控制台,获取 AHAS 启动参数。
- 在控制台最上方地域列表中,选择地域为公网。
- 在左侧导航栏选择新应用接入。 ,单击右上角
- 选择
示例如下:
-Dproject.name=MqConsumerDemo -Dahas.license=<License>
其中,
MqConsumerDemo
表示应用名,可自定义;<License>
表示您的授权证书,请修改为真实值。
页签,查看启动参数。
- 在 Consumer 中添加启动参数。
- 启动 Publisher 开始发送消息,再启动 Consumer 开始接收消息。
启动 Publisher/Consumer 后,本地 IDE 的 consol 区域开始打印消息发送日志/消息接收日志,通过查看日志判断消息发送情况。
-
在请求链路页面,在对应的请求链路的操作列中,单击流控,填写流控规则,并单击新建完成创建。
- 流控方式:排队等待
- 阈值类型:QPS,设置 QPS 阈值为 10,代表每 100ms 匀速通过一个请求。
- 超时时间(ms):2000,超出此超时时间的请求将立即被拒绝,不会进入队列。
- 通过消息队列 RocketMQ Producer 端向 Consumer 批量发送消息,查看流控效果。
- 在 Consumer 控制台,通过观察消息头部的时间戳(如下所示),可以发现消息消费的速率是匀速的,大约每 100 毫秒消费一条消息。同时,不断有排队的处理任务完成,超出等待时长的处理请求直接被拒绝。
1550732955137 | handling message: Hello MQ 2453 1550732955236 | handling message: Hello MQ 9162 1550732955338 | handling message: Hello MQ 4944 1550732955438 | handling message: Hello MQ 5582 1550732955538 | handling message: Hello MQ 4493 1550732955637 | handling message: Hello MQ 3036 1550732955738 | handling message: Hello MQ 1381 1550732955834 | handling message: Hello MQ 1450 1550732955937 | handling message: Hello MQ 5871
说明 在处理被拒绝请求的时候,需要根据业务需求,决定是否重新消费消息。 - 在 AHAS 控制台的应用详情页面,单击监控详情,查看消息处理的监控曲线:
如果没有使用匀速限流模式,该消息处理的监控曲线会类似于:
如果不开启匀速模式,只会同时处理 10 条消息,其余的全部被拒绝。即使后面的时间系统资源充足,多余的请求也无法被处理,因而浪费了许多空闲资源。两种模式对比说明匀速模式下消息处理能力得到了更好的利用。
- 在 Consumer 控制台,通过观察消息头部的时间戳(如下所示),可以发现消息消费的速率是匀速的,大约每 100 毫秒消费一条消息。同时,不断有排队的处理任务完成,超出等待时长的处理请求直接被拒绝。
步骤二:配置削峰填谷规则
将应用接入 AHAS 应用流控降级后,需要为其配置规则来实现削峰填谷。
- 登录 AHAS 控制台。
-
在左侧导航栏选择流控降级。
-
单击已接入应用的卡片,进入详情页面。
-
单击左侧导航栏的机器列表。在机器列表页面,可以看到刚刚接入的机器,代表接入成功。
- 单击左侧导航栏的请求链路,查看已定义的资源。单击右边的“流控”按钮添加新的流控规则:
-
示例二:消息队列 Kafka Consumer 接入示例
与消息队列 RocketMQ 类似,消息队列 Kafka Consumer 也可通过类似方法接入和使用 AHAS 流控降级服务。
示例:在处理消息的逻辑处定义资源。
private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) {
pool.submit(() -> {
Entry entry = null;
try {
// 定义资源。为了便于标识,资源名称定义为 Group ID 和 Topic 的组合。Group ID 和 Topic 可以通过 MQ 控制台获得。
entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic);
// 业务的消息处理逻辑
System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString());
} catch (BlockException ex) {
// Blocked
// 在处理请求被拒绝的情况时候,需要根据业务需求,决定是否重新消费消息。
System.err.println("Blocked: " + record.toString());
} finally {
if (entry != null) {
entry.exit();
}
}
});
}
示例:消费者订阅消息的逻辑
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
// 必须在下次 poll 之前消费完这些数据, 且总耗时不得超过 SESSION_TIMEOUT_MS_CONFIG
// 建议开一个单独的线程池来消费消息,然后异步返回结果
for (ConsumerRecord<String, String> record : records) {
handleMessage(record, groupId, topic);
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
相关文档
本文介绍的是 AHAS 流控降级服务的一个场景 - 请求匀速。AHAS 流控降级服务还可以处理更复杂的各种情况,例如:
- 流量控制:针对不同的调用关系,以不同的运行指标(如 QPS、线程数、系统负载等)为基准,对资源调用进行流量控制,将随机的请求调整成合适的形状(请求匀速、Warm Up 等)。
- 熔断降级:当调用链路中某个资源出现不稳定的情况,如平均响应时间增高、异常比例升高的时候,使对此资源的调用请求快速失败,避免影响其它的资源导致级联失败。
- 系统负载保护:从系统的维度提供保护。当系统负载较高的时候,提供保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。
您可以参考 AHAS 流控降级文档探索更多的场景。
在文档使用中是否遇到以下问题
更多建议
匿名提交