本文以Java SDK为例说明如何调用SDK收发普通消息。
前提条件
- 步骤二:创建资源
- 安装IDEA
您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA Ultimate为例。
- 安装1.8或以上版本SDK
- 安装2.5或以上版本Maven
安装Java依赖库
生产消息
在已创建的Java工程中,创建发送普通消息程序并运行,示例代码如下:
package doc;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云ECS内网访问,建议填写VPC接入点。
* 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//消息发送的目标Topic名称,需要提前在控制台创建,如果不创建直接使用会返回报错。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
* 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
/**
* 初始化Producer时直接配置需要使用的Topic列表(这个参数可以配置多个Topic),实现提前检查错误配置、拦截非法配置启动。
* 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的Topic是否合法。
* 注意!!!事务消息Topic必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。
*/
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
消费消息
在已创建的Java工程中,创建订阅普通消息程序并运行。消息队列RocketMQ版支持SimpleConsumer和PushConsumer两种消费者类型,您可以选择任意一种方式订阅消息,具体的消费者类型的差异如下:
对比项 | PushConsumer | SimpleConsumer |
---|---|---|
接口方式 | 使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。 | 业务方自行实现消息处理,并主动调用接口返回消费结果。 |
消费并发度管理 | 由SDK管理消费并发度。 | 由业务方消费逻辑自行管理消费线程。 |
接口灵活度 | 高度封装,不够灵活。 | 原子接口,可灵活自定义。 |
适用场景 | 适用于无自定义流程的开发场景。 | 适用于需要高度自定义业务流程的开发场景。 |
package doc; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.shaded.org.slf4j.Logger; import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; public class PushConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, IOException, InterruptedException { /** * 实例接入点,从控制台实例详情页的接入点页签中获取。 * 如果是在阿里云ECS内网访问,建议填写VPC接入点。 * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。 */ String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080"; //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。 String topic = "Your Topic"; //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。 String consumerGroup = "Your ConsumerGroup"; final ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints); /** * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。 * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。 */ //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")); ClientConfiguration clientConfiguration = builder.build(); //订阅消息的过滤规则,表示订阅所有Tag的消息。 String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); //初始化SimpleConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) //设置消费者分组。 .setConsumerGroup(consumerGroup) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) //设置消费监听器。 .setMessageListener(messageView -> { //处理消息并返回消费结果。 // LOGGER.info("Consume message={}", messageView); System.out.println("Consume Message: " + messageView); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); //如果不需要再使用PushConsumer,可关闭该进程。 //pushConsumer.close(); } }
package doc; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.SimpleConsumer; import org.apache.rocketmq.client.apis.message.MessageId; import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.shaded.org.slf4j.Logger; import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.function.Consumer; public class SimpleConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class); private SimpleConsumerExample() { } public static void main(String[] args) throws ClientException, IOException { /** * 实例接入点,从控制台实例详情页的接入点页签中获取。 * 如果是在阿里云ECS内网访问,建议填写VPC接入点。 * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。 */ String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080"; //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。 String topic = "Your Topic"; //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。 String consumerGroup = "Your ConsumerGroup"; final ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints); /** * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。 * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。 */ //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")); ClientConfiguration clientConfiguration = builder.build(); Duration awaitDuration = Duration.ofSeconds(10); //订阅消息的过滤规则,表示订阅所有Tag的消息。 String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); //初始化SimpleConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。 SimpleConsumer consumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) //设置消费者分组。 .setConsumerGroup(consumerGroup) //设置长轮询超时时间。 .setAwaitDuration(awaitDuration) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); //消息的最大重试投递次数。 int maxMessageNum = 16; //设置消息的不可见时间。 Duration invisibleDuration = Duration.ofSeconds(10); //SimpleConsumer需要客户端一直主动循环获取消息,并进行消费处理。 //如果需要提高消费实时性,建议多线程并发拉取。 while (true) { final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration); messages.forEach(messageView -> { // LOGGER.info("Received message: {}", messageView); System.out.println("Received message: " + messageView); }); for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); try { //消费处理完成后,需要主动调用ACK向服务端提交消费结果。 consumer.ack(message); System.out.println("Message is acknowledged successfully, messageId= " + messageId); //LOGGER.info("Message is acknowledged successfully, messageId={}", messageId); } catch (Throwable t) { t.printStackTrace(); //LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t); } } } // 如果不需要再使用SimpleConsumer,可关闭该进程。 // consumer.close(); } }
验证消息
消息收发完成后,您可以通过控制台查看消息消费情况。
- 登录控制台,在实例列表页面选择目标实例。
- 在左侧导航栏单击消息轨迹。
SDK参考
本文以Java SDK为例介绍收发普通消息流程,其他语言SDK和其他类型消息的示例代码,请参见SDK参考概述。