本文介绍Apache RocketMQ Java SDK的消息收发示例代码。
示例代码
重要
如果您使用的是Serverless版实例,在公网访问的时候需要注意SDK的版本等信息,详情请参见Serverless版实例公网访问版本说明。
gRPC协议SDK
Remoting协议SDK
关于RocketMQ-Spring的示例代码,请参见rocketmq-v5-client-spring-boot-samples。
使用gRPC协议的SDK为
rocketmq-client-java
,下列是使用该SDK的示例代码。消息类型
发送消息示例代码
订阅消息示例代码
关于RocketMQ-Spring的示例代码,请参见rocketmq-spring-boot-samples。
使用Remoting协议的SDK为
rocketmq-client
,下列是使用该SDK的示例代码。普通消息
发送普通消息(同步发送)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQProducer { /** * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。 * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。 * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。 * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // 使用公网接入点时,需要配置RPCHook。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // 使用VPC接入点时,无需配置RPCHook。 // 如果实例类型为Serverless实例,则必须配置RPCHook。 // DefaultMQProducer producer = new DefaultMQProducer(); //您在消息队列RocketMQ版控制台创建的Group ID。 producer.setProducerGroup("YOUR GROUP ID"); // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。 producer.setAccessChannel(AccessChannel.CLOUD); // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数 producer.setEnableTrace(true); // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { //消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // 在应用退出前,销毁Producer对象。 // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。 producer.shutdown(); } }
发送普通消息(异步发送)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; import java.util.concurrent.TimeUnit; public class RocketMQAsyncProducer { /** * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。 * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。 * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。 * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException, InterruptedException { // 使用公网接入点时,需要配置RPCHook。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // 使用VPC接入点时,无需配置RPCHook。 // 如果实例类型为Serverless实例,则必须配置RPCHook。 // DefaultMQProducer producer = new DefaultMQProducer(); //您在消息队列RocketMQ版控制台创建的Group ID producer.setProducerGroup("YOUR GROUP ID"); // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。 producer.setAccessChannel(AccessChannel.CLOUD); // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数 producer.setEnableTrace(true); // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult result) { // 消息发送成功。 System.out.println("send message success. msgId= " + result.getMsgId()); } @Override public void onException(Throwable throwable) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println("send message failed."); throwable.printStackTrace(); } }); } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // 阻塞当前线程3秒,等待异步发送结果。 TimeUnit.SECONDS.sleep(3); // 在应用退出前,销毁Producer对象。 // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。 producer.shutdown(); } }
发送普通消息(单向发送)
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQOnewayProducer { /** * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。 * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。 * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。 * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // 使用公网接入点时,需要配置RPCHook。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // 使用VPC接入点时,无需配置RPCHook。 // 如果实例类型为Serverless实例,则必须配置RPCHook。 // DefaultMQProducer producer = new DefaultMQProducer(); //您在消息队列RocketMQ版控制台创建的Group ID。 producer.setProducerGroup("YOUR GROUP ID"); // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。 producer.setAccessChannel(AccessChannel.CLOUD); // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数 producer.setEnableTrace(true); // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { Message msg = new Message("YOUR TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.sendOneway(msg); } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // 在应用退出前,销毁Producer对象。 // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。 producer.shutdown(); } }
订阅普通消息
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import java.util.List; public class RocketMQPushConsumer { /** * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。 * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。 * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。 * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // 使用公网接入点时,需要配置RPCHook。 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook()); // 使用VPC接入点时,无需配置RPCHook。 // 如果实例类型为Serverless实例,则必须配置RPCHook。 // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); //您在消息队列RocketMQ版控制台创建的Group ID。 consumer.setConsumerGroup("YOUR GROUP ID"); // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。 consumer.setAccessChannel(AccessChannel.CLOUD); // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数 consumer.setEnableTrace(true); // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 consumer.setNamesrvAddr("YOUR ACCESS POINT"); // 设置为您在阿里云云消息队列 RocketMQ 版控制台上创建的Topic。 consumer.subscribe("YOUR TOPIC", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("Receive New Messages: %s %n", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
顺序消息
发送顺序消息
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.List; public class RocketMQOrderProducer { /** * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。 * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。 * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。 * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // 使用公网接入点时,需要配置RPCHook。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // 使用VPC接入点时,无需配置RPCHook。 // 如果实例类型为Serverless实例,则必须配置RPCHook。 // DefaultMQProducer producer = new DefaultMQProducer(); //您在消息队列RocketMQ版控制台创建的Group ID。 producer.setProducerGroup("YOUR GROUP ID"); // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。 producer.setAccessChannel(AccessChannel.CLOUD); // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数 producer.setEnableTrace(true); // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { int orderId = i % 10; Message msg = new Message("YOUR ORDER TOPIC", "YOUR MESSAGE TAG", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 注意!!!请务必设置该配置项,顺序消息才能均匀分布到各队列中。 // 5.x版本下面一行代码可以替换为 msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + ""); msg.putUserProperty("__SHARDINGKEY", orderId + ""); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 选择适合自己的分区选择算法,保证同一个参数得到的结果相同。 Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
订阅顺序消息
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import java.util.List; public class RocketMQOrderConsumer { private static RPCHook getAclRPCHook() { /** * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。 * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。 * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。 * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。 */ return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // 使用公网接入点时,需要配置RPCHook。 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook()); // 使用VPC接入点时,无需配置RPCHook。 // 如果实例类型为Serverless实例,则必须配置RPCHook。 // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); //您在消息队列RocketMQ版控制台创建的Group ID。 consumer.setConsumerGroup("YOUR GROUP ID"); // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。 consumer.setAccessChannel(AccessChannel.CLOUD); // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数 consumer.setEnableTrace(true); // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 consumer.setNamesrvAddr("YOUR ACCESS POINT"); consumer.subscribe("YOUR ORDER TOPIC", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 消费失败则挂起重试返回:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。 return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
定时/延时消息
发送定时/延时消息
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.Date; public class RocketMQDelayProducer { /** * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。 * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。 * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。 * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。 */ private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // 使用公网接入点时,需要配置RPCHook。 DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook()); // 使用VPC接入点时,无需配置RPCHook。 // 如果实例类型为Serverless实例,则必须配置RPCHook。 // DefaultMQProducer producer = new DefaultMQProducer(); //您在消息队列RocketMQ版控制台创建的Group ID。 producer.setProducerGroup("YOUR GROUP ID"); // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。 producer.setAccessChannel(AccessChannel.CLOUD); // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数 producer.setEnableTrace(true); // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 producer.setNamesrvAddr("YOUR ACCESS POINT"); producer.start(); for (int i = 0; i < 128; i++) { try { // 设置为您在云消息队列 RocketMQ 版控制台创建的Topic。 Message msg = new Message("YOUR TOPIC", // 设置消息的Tag。 "YOUR MESSAGE TAG", // 消息内容。 "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送延时消息,需要设置延时时间,单位毫秒(ms),消息将在指定延时时间后投递,例如消息将在3秒后投递。 long delayTime = System.currentTimeMillis() + 3000; msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime)); // 若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2021-08-10 18:45:00投递。 // 定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。 // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime(); // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println(new Date() + " Send mq message failed."); e.printStackTrace(); } } // 在应用退出前,销毁Producer对象。 // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。 producer.shutdown(); } }
订阅定时/延时消息的示例代码和订阅普通消息相同。
事务消息
发送事务消息
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQTransactionProducer { private static RPCHook getAclRPCHook() { /** * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。 * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。 * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。 * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。 */ return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD")); } public static void main(String[] args) throws MQClientException { // 使用公网接入点时,需要配置RPCHook。 // 您在消息队列RocketMQ版控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。 TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook()); // 使用VPC接入点时,无需配置RPCHook。 // 如果实例类型为Serverless实例,则必须配置RPCHook。 // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID"); // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。 transactionMQProducer.setAccessChannel(AccessChannel.CLOUD); // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数 transactionMQProducer.setEnableTrace(true); // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT"); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println("开始执行本地事务: " + msg); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("收到事务消息的回查请求, MsgId: " + msg.getMsgId()); return LocalTransactionState.COMMIT_MESSAGE; } }); transactionMQProducer.start(); for (int i = 0; i < 10; i++) { try { Message message = new Message("YOUR TRANSACTION TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null); assert sendResult != null; } catch (Exception e) { e.printStackTrace(); } } } }
订阅事务消息的示例代码和订阅普通消息相同。
Serverless版实例公网访问版本说明
Serverless版实例使用公网访问接入云消息队列 RocketMQ 版时,需要保证使用的SDK版本满足以下要求,并在消息收发代码中补充如下内容:
说明
其中,InstanceId
需要替换为您实际使用的实例ID。
SDK版本:rocketmq-client ≥ 5.2.0
消息发送代码补充:
producer.setNamespaceV2("InstanceId");
消息消费代码补充:
consumer.setNamespaceV2("InstanceId");
SDK版本:rocketmq-client-java ≥ 5.0.6
消息发送和消息消费代码补充:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();
相关文档
该文章对您有帮助吗?
- 本页导读 (1)
- 示例代码
- Serverless版实例公网访问版本说明
- 相关文档