快速使用云上RocketMQ进行消息收发
ons
手动配置
20
教程简介
在本教程中,您将学习如何将阿里云消息队列 RocketMQ 版提供的Java SDK接入服务端,并完成消息收发。
云消息队列 RocketMQ 版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台,面向互联网分布式应用场景提供微服务异步解耦、流式数据处理、事件驱动处理等核心能力。更多信息,请参见什么是消息队列RocketMQ版。
我能学到什么
学会如何创建云消息队列 RocketMQ 版实例及相关资源。
学会如何将SDK接入云消息队列 RocketMQ 版服务端。
学会如何调用SDK收发消息。
操作难度 | 中 |
所需时间 | 20分钟 |
使用的阿里云产品 | |
所需费用 |
|
准备环境和资源
3
开始教程前,请按以下步骤准备环境和资源:
创建专有网络VPC和交换机。注意创建的VPC和交换机需要和后续创建的云消息队列 RocketMQ 版实例所在的地域相同。
关于如何创建VPC和交换机,请参见创建专有网络和交换机。
您可以前往阿里云免费试用查看您的阿里云账号是否具有专有网络VPC的试用资格。如有试用资格,可通过试用专有网络VPC来完成教程。如无试用资格,则需使用自有VPC资源才能完成教程。不建议使用生产环境中的自有VPC资源,也不要将教程中的测试数据和自有VPC资源的数据混用。测试完成后及时清理测试数据,避免影响自有资源正常运行。
申请免费试用。
访问阿里云免费试用。单击页面右上方的登录/注册按钮,并根据页面提示完成账号登录(已有阿里云账号)、账号注册(尚无阿里云账号)或实名认证(根据试用产品要求完成个人实名认证或企业实名认证)。
成功登录后,在产品类别下选择中间件>云消息队列 RocketMQ 版,单击云消息队列 RocketMQ 版卡片上的立即试用按钮。
教程使用的实例规格为标准版的集群高可用版实例,实际操作时,建议根据您的业务体量和需求选择。
在弹出的云消息队列RocketMQ版面板中,完成云消息队列 RocketMQ 版实例的配置(实际操作请根据业务需求按需配置)。
配置参数
本教程取值
说明
地域和可用区
华东1(杭州)
云消息队列 RocketMQ 版实例所属的地域。
VPC ID
vpc-bp1ov******
在下拉列表中选择已创建的VPC。
创建的VPC所属地域需要与本步骤的地域和可用区相同。
VSwitch ID
vsw-bp14j******
在下拉列表中选择已创建的交换机。
创建的交换机所属地域需要与本步骤的地域和可用区相同。
公网访问类型
开启
本教程以公网环境接入云消息队列 RocketMQ 版实例为例,因此需要开启公网访问。若您实际生产环境为VPC访问,可以关闭公网访问。
选中服务协议,单击立即试用,按提示完成申请。
完成试用申请后,系统将会自动创建实例。您可以访问RocketMQ实例列表,在页面上方选择试用申请时配置的地域,如华东1(杭州),然后刷新页面查看,实例创建需要约1~3分钟,当实例状态为运行中时,即可正常使用。
准备Java开发环境。本教程使用Java SDK接入服务端完成消息收发。
操作系统:Windows 10 64位
创建Topic和Group
2
在开通免费试用云消息队列 RocketMQ 版后,系统会提供一个月试用的标准版集群高可用类型的实例,您需要登录消息队列 RocketMQ 版控制台创建Topic和Group资源,用于发送消息和订阅消息。
登录云消息队列 RocketMQ 版控制台,在左侧导航栏,选择实例列表。
在顶部菜单栏,选择和试用实例相同的地域。
在实例列表页面单击试用实例的名称。
在左侧导航栏选择Topic 管理,然后在页面左上角单击创建 Topic。
在创建 Topic面板填写Topic名称和描述,并选择消息类型为普通消息,然后单击确定完成创建。
创建Topic时选择的消息类型必须和实际收发消息的类型一致,本教程以收发普通消息为例,因此消息类型选择普通消息。实际生产环境请根据业务消息的类型进行选择。
在左侧导航栏单击Group 管理,然后在页面左上角单击创建 Group。
在创建 Group面板填写Group ID,其他参数使用默认值,然后单击确定完成创建。
安装Java SDK依赖库
2
打开IntelliJ IDEA,创建一个Java工程。
在pom.xml文件中添加以下依赖引入Java SDK的依赖库。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.4</version> </dependency>
启动消费者客户端订阅消息
5
在IDEA中打开已创建的Java工程,在src/main/java路径下创建一个Java类。
将Java类的内容替换为云消息队列 RocketMQ 版提供的消息订阅代码。示例代码如下:
package doc; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.shaded.org.slf4j.Logger; import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; public class PushConsumerExample { private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class); private PushConsumerExample() { } public static void main(String[] args) throws ClientException, IOException, InterruptedException { /** * 实例接入点,从控制台实例详情页的接入点页签中获取。 * 如果是在阿里云ECS内网访问,建议填写VPC接入点。 * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。 */ String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080"; //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。 String topic = "Your Topic"; //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。 String consumerGroup = "Your ConsumerGroup"; final ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints); /** * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。 * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。 */ builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")); ClientConfiguration clientConfiguration = builder.build(); //订阅消息的过滤规则,表示订阅所有Tag的消息。 String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。 PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) //设置消费者分组。 .setConsumerGroup(consumerGroup) //设置预绑定的订阅关系。 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) //设置消费监听器。 .setMessageListener(messageView -> { //处理消息并返回消费结果。 // LOGGER.info("Consume message={}", messageView); System.out.println("Consume Message: " + messageView); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); //如果不需要再使用PushConsumer,可关闭该进程。 //pushConsumer.close(); } }
参考下表修改消费者示例代码中的参数值。以下参数仅为示例值,需要修改为您实际使用的参数值。
参数
示例
说明
endpoints
rmq-cn-******.cn-hangzhou.rmq.aliyuncs.com:8080
实例的公网接入点。
可从云消息队列 RocketMQ 版控制台实例详情页的TCP 协议接入点页签中获取。
本教程以公网环境接入为例,若您使用VPC网络接入,则接入点需要填写为VPC专有网络接入点。
topic
topic_normal
已创建的Topic的名称,表示消费者需要订阅指定的Topic的消息。
可从云消息队列 RocketMQ 版控制台的Topic 管理页面查看。
consumerGroup
GID_test
已创建的Group的ID,表示消费者使用该消费者分组订阅指定的Topic。
可从云消息队列 RocketMQ 版控制台的Group 管理页面查看。
Instance UserName
21Vshz0YD9******
本教程以公网环境接入为例,因此该参数填写为实例用户名。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
Instance Password
VrQCx2xr9a******
本教程以公网环境接入为例,因此该参数填写为实例密码。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
参数修改完成后,运行消费者示例代码,启动消费者客户端。返回类似如下信息,表示消费者客户端已接入云消息队列 RocketMQ 版服务端。
此操作为第一次启动消费者,此时生产者还未接入服务端,因此还未获取到消息为正常结果。
启动生产者客户端发送消息
4
在IDEA中打开已创建的Java工程,在src/main/java路径下创建一个Java类。
将Java类的内容替换为云消息队列 RocketMQ 版提供的消息发送代码。示例代码如下:
package doc; import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; public class ProducerExample { public static void main(String[] args) throws ClientException { /** * 实例接入点,从控制台实例详情页的接入点页签中获取。 * 如果是在阿里云ECS内网访问,建议填写VPC接入点。 * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。 */ String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080"; //消息发送的目标Topic名称,需要提前在控制台创建,如果不创建直接使用会返回报错。 String topic = "Your Topic"; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints); /** * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。 * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。 */ builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")); ClientConfiguration configuration = builder.build(); /** * 初始化Producer时直接配置需要使用的Topic列表(这个参数可以配置多个Topic),实现提前检查错误配置、拦截非法配置启动。 * 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的Topic是否合法。 * 注意!!!事务消息Topic必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。 */ Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); //普通消息发送。 Message message = provider.newMessageBuilder() .setTopic(topic) //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") //消息体。 .setBody("messageBody".getBytes()) .build(); try { //发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); } } }
参考下表修改生产者示例代码中的参数值。以下参数仅为示例值,需要修改为您实际使用的参数值。
参数
示例
说明
endpoints
rmq-cn-******.cn-hangzhou.rmq.aliyuncs.com:8080
实例的公网接入点。
可从云消息队列 RocketMQ 版控制台实例详情页的TCP 协议接入点页签中获取。
本教程以公网环境接入为例,若您使用VPC网络接入,则接入点需要填写为VPC专有网络接入点。
topic
topic_normal
已创建的Topic的名称,表示生产者向哪个Topic发送消息。
可从云消息队列 RocketMQ 版控制台的Topic管理页面查看。
Instance UserName
21Vshz0YD9******
本教程以公网环境接入为例,因此该参数填写为实例用户名。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
Instance Password
VrQCx2xr9a******
本教程以公网环境接入为例,因此该参数填写为实例密码。
可从云消息队列 RocketMQ 版控制台的实例详情页面的运行信息区域查看。
若使用VPC接入,则无需配置该参数。
参数修改完成后,运行生产者示例代码,启动生产者客户端发送消息。返回类似如下信息,表示生产者客户端已接入云消息队列 RocketMQ 版服务端并成功发送消息。
完成
3
查看消息消费结果
消息发送成功后,将运行结果窗口切换到消息订阅运行程序中,会看到返回如下消费结果,您可以根据返回的消息ID在控制台查询消息轨迹。

查询消息轨迹
登录云消息队列 RocketMQ 版控制台,在左侧导航栏,选择实例列表。
在顶部菜单栏,选择和试用实例相同的地域。
在实例列表页面单击试用实例的名称。
在左侧导航栏单击消息轨迹,然后单击创建查询任务。
在创建消息轨迹查询任务面板中选择消息所属的Topic、选择查询方式为Message ID 查询、输入在消费结果中获取到的Message ID,然后单击确定创建查询任务。
刷新页面,待查询任务的状态变为查询完成,然后在其操作列单击查询结果。
在查询结果页面中,单击查询结果列表操作列的消息轨迹。
您可以在轨迹详情页面查看指定消息在各阶段的详细状态。
各返回参数的详细信息,请参见轨迹参数说明。
清理及后续
1
本教程使用的标准版实例只能免费试用1个月。试用期结束后您可以选择释放实例或一键转包年包月,否则超过有效期的部分将会按照按量付费方式进行计费。
如果您无需使用云消息队列 RocketMQ 版,请按照如下操作及时清理和释放资源。
登录云消息队列RocketMQ版控制台,在左侧导航栏选择实例列表。
在顶部菜单栏选择试用实例所在的地域,然后在目标试用实例所在的操作列选择更多>释放。
在弹出的对话框中单击确定。
如果您需要继续使用云消息队列 RocketMQ 版,可以将该试用实例转为包年包月计费类型。
登录云消息队列RocketMQ版控制台,在左侧导航栏选择实例列表。
在顶部菜单栏选择试用实例所在的地域,然后在目标试用实例所在的付费类型列选择转包年包月。
按照界面提示完成购买。
总结
常用知识点
问题1:如果使用本地网络访问云消息队列 RocketMQ 版服务,SDK代码中的接入点(endpoints)应该填写哪个?(单选题)
VPC专有网络接入点
公网接入点
正确答案是公网接入点。如果使用内网访问云消息队列 RocketMQ 版服务,则需要填写VPC接入点。
问题2:发送普通消息时,创建的Topic的消息类型是什么?(单选题)
普通消息
顺序消息
事务消息
定时/延时消息
正确答案是普通消息。发送的消息类型和Topic的类型要保持一致,否则会导致消息发送失败。