AHAS应用防护功能与消息队列RocketMQ版组合,可以让MQ消费端负载保持在消息处理水位之下,同时尽可能处理更多消息,达到削峰填谷的效果。本文以AHAS应用防护的匀速处理请求的能力为例,说明如何对RocketMQ消费端进行限流。
背景信息
在消息队列RocketMQ版中,消费者消费消息时,很可能出现因消息发送量突增而消费者来不及处理的情况,导致消费方负载过高,进而导致影响系统稳定性。
在实际场景中,消息的到来具有瞬时性、不规律性,导致系统可能出现空闲资源。利用AHAS应用防护的匀速处理请求的能力,可以把超过消费端处理能力的消息(图中黄色部分)均摊到后面系统空闲时去处理,让系统负载处在一个稳定的水位,同时尽可能地处理更多消息,起到削峰填谷的作用。
AHAS应用防护在削峰填谷的场景时,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求。
例如,在RocketMQ的场景下,配置匀速模式下请求QPS为8,则每200 ms处理一条消息,多余的处理任务将排队;同时配置超时时间为8秒,预计的排队时长超过8秒的处理任务将会被直接拒绝。
前提条件
- 已在消息队列RocketMQ中发送和订阅消息,请参见消息队列RocketMQ版快速入门。
- 已开通AHAS,请参见开通AHAS。
步骤一:接入AHAS应用防护
下面将介绍如何快速在消息队列RocketMQ Consumer(消费端)接入和使用AHAS应用防护服务 。您可以下载Demo工程来完成以下步骤。
-
在Consumer的pom文件中引入AHAS应用防护(即Sentinel)依赖。
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>ahas-sentinel-client</artifactId> <version>x.y.z</version> </dependency>
说明 请在AHAS依赖仓库查看依赖最新版本。 - 定义资源。
由于消息队列RocketMQ Consumer未提供相应拦截机制,而且每次收到都可能是批量的消息,因此用户需要在处理消息时手动定义资源。
定义消息处理逻辑为消息被拒绝后会记录错误并触发重新投递,代码示例如下。
private static Action handleMessage(Message message, String groupId, String topic) { Entry entry = null; try { // 定义资源。为了便于标识,资源名称定义为Group ID和Topic的组合。Group ID和Topic可以通过消息队列RocketMQ控制台获得。 entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic); // 业务真实的消息处理逻辑。 System.out.println(System.currentTimeMillis() + " | handling message: " + message); return Action.CommitMessage; } catch (BlockException ex) { // 编写被流控的消息的处理逻辑。示例:记录错误或进行重试。 System.err.println("Blocked, will retry later: " + message); // 会触发消息重新投递 return Action.ReconsumeLater; } finally { if (entry != null) { entry.exit(); } } }
消费者订阅消息的逻辑示例如下。
Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(topic, "*", (message, context) -> { return handleMessage(message); }); consumer.start();
关于消息队列RocketMQ版如何订阅消息,请参见消息队列RocketMQ版文档。
- 登录AHAS控制台,获取AHAS启动参数。
- 在控制台最上方地域列表中,选择地域为公网。
- 在左侧导航栏选择新应用接入。 ,单击左上角
- 选择
启动参数示例如下。
-Dproject.name=MqConsumerDemo -Dahas.license=<License>
其中
MqConsumerDemo
表示应用名,可自定义;<License>
表示您的授权证书,请修改为真实值。
页签,查看启动参数。
- 在Consumer中添加启动参数。
- 启动Publisher开始发送消息,再启动Consumer开始接收消息。
启动Publisher、Consumer后,本地IDE的console区域开始打印消息发送日志、消息接收日志,通过查看日志判断消息发送情况。
步骤二:配置削峰填谷规则
将应用接入AHAS应用防护后,需要为其配置规则来实现削峰填谷。
- 登录AHAS控制台,在左侧导航栏选择 。
- 在应用防护页面单击目标应用资源卡片,进入该应用管理界面。
- 在左侧导航栏选择应用概览,在目标接口的操作列中,单击流控,填写流控规则,并单击新建完成创建。详情请参见配置流控规则。
- 流控效果:排队等待。
- 单机QPS阈值:QPS,设置QPS阈值为10,代表每100 ms匀速通过一个请求。
- 超时时间:2000 ms,超出此超时时间的请求将立即被拒绝,不会进入队列。
- 通过消息队列RocketMQ Producer端向Consumer批量发送消息,查看流控效果。
- 在Consumer控制台,通过观察消息头部的时间戳(如下所示),可以发现消息消费的速率是匀速的,大约每100毫秒消费一条消息。同时,不断有排队的处理任务完成,超出等待时长的处理请求直接被拒绝。
1550732955137 | handling message: Hello MQ 2453 1550732955236 | handling message: Hello MQ 9162 1550732955338 | handling message: Hello MQ 4944 1550732955438 | handling message: Hello MQ 5582 1550732955538 | handling message: Hello MQ 4493 1550732955637 | handling message: Hello MQ 3036 1550732955738 | handling message: Hello MQ 1381 1550732955834 | handling message: Hello MQ 1450 1550732955937 | handling message: Hello MQ 5871
说明 在处理被拒绝请求的时候,需要根据业务需求,决定是否重新消费消息。 - 在AHAS控制台的应用详情页面,单击监控详情,查看消息处理的监控曲线。
如果没有使用匀速限流模式,该消息处理的监控曲线会如下图。
如果不开启匀速模式,只会同时处理10条消息,其余的全部被拒绝。即使后面的时间系统资源充足,多余的请求也无法被处理,因而浪费了许多空闲资源。两种模式对比说明匀速模式下消息处理能力得到了更好的利用。
- 在Consumer控制台,通过观察消息头部的时间戳(如下所示),可以发现消息消费的速率是匀速的,大约每100毫秒消费一条消息。同时,不断有排队的处理任务完成,超出等待时长的处理请求直接被拒绝。
消息队列Kafka Consumer接入示例
与消息队列RocketMQ版类似,消息队列Kafka Consumer也可通过类似方法接入和使用AHAS应用防护服务。
示例:在处理消息的逻辑处定义资源。
private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) {
pool.submit(() -> {
Entry entry = null;
try {
// 定义资源。为了便于标识,资源名称定义为Group ID和Topic的组合。Group ID和Topic可以通过MQ控制台获得。
entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic);
// 业务的消息处理逻辑。
System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString());
} catch (BlockException ex) {
// Blocked
// 在处理请求被拒绝的情况时候,需要根据业务需求,决定是否重新消费消息。
System.err.println("Blocked: " + record.toString());
} finally {
if (entry != null) {
entry.exit();
}
}
});
}
示例:消费者订阅消息的逻辑如下。
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(1000);
// 必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG。
// 建议开一个单独的线程池来消费消息,然后异步返回结果。
for (ConsumerRecord<String, String> record : records) {
handleMessage(record, groupId, topic);
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (Throwable ignore) {
}
e.printStackTrace();
}
}
相关文档
本文介绍的是AHAS应用防护服务的一个场景:请求匀速。AHAS应用防护服务还可以处理更复杂的各种情况。
- 流量控制:针对不同的调用关系,以不同的运行指标(如QPS、线程数、系统负载等)为基准,对资源调用进行流量控制,将随机的请求调整成合适的形状(请求匀速、Warm Up等)。
- 熔断降级:当调用链路中某个资源出现不稳定的情况,如平均响应时间增高、异常比例升高的时候,使对此资源的调用请求快速失败,避免影响其它的资源导致级联失败。
- 系统负载保护:从系统的维度提供保护。当系统负载较高的时候,提供保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。
详情请参见AHAS应用防护。