普通消息为云消息队列 RocketMQ 版中最基础的消息,区别于有特性的顺序消息、定时/延时消息和事务消息。本文为您介绍普通消息的应用场景、功能原理、使用方法和使用建议。
应用场景
普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。
典型场景一:微服务异步解耦
如上图所示,以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至云消息队列 RocketMQ 版服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。
典型场景二:数据集成传输
如上图所示,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到云消息队列 RocketMQ 版。每条消息都是一段日志数据,云消息队列 RocketMQ 版不做任何处理,只需要将日志数据可靠投递到下游的存储系统和分析系统即可,后续功能由后端应用完成。
功能原理
什么是普通消息
定义:普通消息是云消息队列 RocketMQ 版基本消息功能,支持生产者和消费者的异步解耦通信。
普通消息生命周期
初始化
消息被生产者构建并完成初始化,待发送到服务端的状态。
待消费
消息被发送到服务端,对消费者可见,等待消费者消费的状态。
消费中
消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。
此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,云消息队列 RocketMQ 版会对消息进行重试处理。具体信息,请参见消费重试。
消费提交
消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。
云消息队列 RocketMQ 版默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
消息删除
云消息队列 RocketMQ 版按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。
使用限制
普通消息仅支持使用MessageType为Normal主题,即普通消息只能发送至类型为普通消息的主题中,发送的消息的类型必须和主题的类型一致。
使用示例
普通消息支持设置消息索引键、消息过滤标签等信息,用于消息过滤和搜索查找。以Java语言为例,收发普通消息的示例代码如下:
完整的消息收发示例代码请参见RocketMQ 5.x系列SDK(推荐)。
示例代码
普通消息发送
package doc;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云ECS内网访问,建议填写VPC接入点。
* 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//消息发送的目标Topic名称,需要提前在控制台创建,如果不创建直接使用会返回报错。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
/**
* 初始化Producer时直接配置需要使用的Topic列表(这个参数可以配置多个Topic),实现提前检查错误配置、拦截非法配置启动。
* 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的Topic是否合法。
* 注意!!!事务消息Topic必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。
*/
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
PushConsumer消费
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云ECS内网访问,建议填写VPC接入点。
* 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
String topic = "Your Topic";
//为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
//订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//设置消费者分组。
.setConsumerGroup(consumerGroup)
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//设置消费监听器。
.setMessageListener(messageView -> {
//处理消息并返回消费结果。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
//如果不需要再使用PushConsumer,可关闭该进程。
//pushConsumer.close();
}
}
SimpleConsumer消费
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云ECS内网访问,建议填写VPC接入点。
* 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
String topic = "Your Topic";
//为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
//订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化SimpleConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//设置消费者分组。
.setConsumerGroup(consumerGroup)
//设置长轮询超时时间。
.setAwaitDuration(awaitDuration)
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
//设置本次拉取的最大消息条数。
int maxMessageNum = 16;
//设置消息的不可见时间。
Duration invisibleDuration = Duration.ofSeconds(10);
//SimpleConsumer需要客户端一直主动循环获取消息,并进行消费处理。
//如果需要提高消费实时性,建议多线程并发拉取。
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
//消费处理完成后,需要主动调用ACK向服务端提交消费结果。
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// 如果不需要再使用SimpleConsumer,可关闭该进程。
// consumer.close();
}
}
使用建议
设置全局唯一业务索引键,方便问题追踪
云消息队列 RocketMQ 版支持自定义索引键(消息的Key),在消息查询和轨迹查询时,可以通过索引键高效精确地查询到消息。
因此,发送消息时,建议设置业务上唯一的信息作为索引,方便后续快速定位消息。例如,订单ID,用户ID等。