示例代码

更新时间:2025-01-13 09:56:20

本文介绍Apache RocketMQ Java SDK的消息收发示例代码。

示例代码

重要

如果您使用的是Serverless版实例,在公网访问的时候需要注意SDK的版本等信息,详情请参见Serverless版实例公网访问版本说明

gRPC协议SDK
Remoting协议SDK
  • 关于RocketMQ-Spring的示例代码,请参见rocketmq-spring-boot-samples

  • 使用Remoting协议的SDKrocketmq-client,下列是使用该SDK的示例代码。

    普通消息

    发送普通消息(同步发送)

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    
    public class RocketMQProducer {
        /**
         * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
         * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // 使用公网接入点时,需要配置RPCHook。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // 使用VPC接入点时,无需配置RPCHook。
            // 如果实例类型为Serverless实例,则必须配置RPCHook。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            //您在消息队列RocketMQ版控制台创建的Group ID。
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数
            producer.setEnableTrace(true);
    
            // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    //消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // 在应用退出前,销毁Producer对象。
            // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
            producer.shutdown();
        }
    }

    发送普通消息(异步发送)

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    import java.util.concurrent.TimeUnit;
    
    public class RocketMQAsyncProducer {
        /**
         * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
         * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException, InterruptedException {
            // 使用公网接入点时,需要配置RPCHook。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // 使用VPC接入点时,无需配置RPCHook。
            // 如果实例类型为Serverless实例,则必须配置RPCHook。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            //您在消息队列RocketMQ版控制台创建的Group ID
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数
            producer.setEnableTrace(true);
    
            // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult result) {
                            // 消息发送成功。
                            System.out.println("send message success. msgId= " + result.getMsgId());
                        }
    
                        @Override
                        public void onException(Throwable throwable) {
                            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                            System.out.println("send message failed.");
                            throwable.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
            // 阻塞当前线程3秒,等待异步发送结果。
            TimeUnit.SECONDS.sleep(3);
    
            // 在应用退出前,销毁Producer对象。
            // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
            producer.shutdown();
        }
    }

    发送普通消息(单向发送)

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    
    public class RocketMQOnewayProducer {
        /**
         * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
         * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // 使用公网接入点时,需要配置RPCHook。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // 使用VPC接入点时,无需配置RPCHook。
            // 如果实例类型为Serverless实例,则必须配置RPCHook。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            //您在消息队列RocketMQ版控制台创建的Group ID。
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数
            producer.setEnableTrace(true);
    
            // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    Message msg = new Message("YOUR TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.sendOneway(msg);
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // 在应用退出前,销毁Producer对象。
            // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
            producer.shutdown();
        }
    }

    订阅普通消息

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    
    import java.util.List;
    
    public class RocketMQPushConsumer {
        /**
         * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
         * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // 使用公网接入点时,需要配置RPCHook。
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
            // 使用VPC接入点时,无需配置RPCHook。
            // 如果实例类型为Serverless实例,则必须配置RPCHook。
            // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
    
            //您在消息队列RocketMQ版控制台创建的Group ID。
            consumer.setConsumerGroup("YOUR GROUP ID");
    
            // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
            consumer.setAccessChannel(AccessChannel.CLOUD);
            // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数
            consumer.setEnableTrace(true);
    
            // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            consumer.setNamesrvAddr("YOUR ACCESS POINT");
            // 设置为您在阿里云云消息队列 RocketMQ 版控制台上创建的Topic。
            consumer.subscribe("YOUR TOPIC", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("Receive New Messages: %s %n", msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        }
    }

    顺序消息

    发送顺序消息

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.List;
    
    public class RocketMQOrderProducer {
        /**
         * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
         * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // 使用公网接入点时,需要配置RPCHook。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // 使用VPC接入点时,无需配置RPCHook。
            // 如果实例类型为Serverless实例,则必须配置RPCHook。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            //您在消息队列RocketMQ版控制台创建的Group ID。
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数
            producer.setEnableTrace(true);
    
            // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    int orderId = i % 10;
                    Message msg = new Message("YOUR ORDER TOPIC",
                            "YOUR MESSAGE TAG",
                            "OrderID188",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // 注意!!!请务必设置该配置项,顺序消息才能均匀分布到各队列中。
                    // 5.x版本下面一行代码可以替换为 msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                    msg.putUserProperty("__SHARDINGKEY", orderId + "");
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            // 选择适合自己的分区选择算法,保证同一个参数得到的结果相同。
                            Integer id = (Integer) arg;
                            int index = id % mqs.size();
                            return mqs.get(index);
                        }
                    }, orderId);
    
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            producer.shutdown();
        }
    }

    订阅顺序消息

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    
    import java.util.List;
    
    public class RocketMQOrderConsumer {
        private static RPCHook getAclRPCHook() {
            /**
             * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
             * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
             * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
             * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // 使用公网接入点时,需要配置RPCHook。
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
            // 使用VPC接入点时,无需配置RPCHook。
            // 如果实例类型为Serverless实例,则必须配置RPCHook。
            // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
    
            //您在消息队列RocketMQ版控制台创建的Group ID。
            consumer.setConsumerGroup("YOUR GROUP ID");
    
            // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
            consumer.setAccessChannel(AccessChannel.CLOUD);
            // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数
            consumer.setEnableTrace(true);
    
            // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            consumer.setNamesrvAddr("YOUR ACCESS POINT");
            consumer.subscribe("YOUR ORDER TOPIC", "*");
    
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerOrderly() {
    
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    context.setAutoCommit(true);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    // 消费失败则挂起重试返回:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
    
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    }

    定时/延时消息

    发送定时/延时消息

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.util.Date;
    
    public class RocketMQDelayProducer {
        /**
         * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
         * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
         * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
         */
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // 使用公网接入点时,需要配置RPCHook。
            DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
            // 使用VPC接入点时,无需配置RPCHook。
            // 如果实例类型为Serverless实例,则必须配置RPCHook。
            // DefaultMQProducer producer = new DefaultMQProducer();
    
            //您在消息队列RocketMQ版控制台创建的Group ID。
            producer.setProducerGroup("YOUR GROUP ID");
    
            // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
            producer.setAccessChannel(AccessChannel.CLOUD);
            // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数
            producer.setEnableTrace(true);
    
            // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            producer.setNamesrvAddr("YOUR ACCESS POINT");
            producer.start();
    
            for (int i = 0; i < 128; i++) {
                try {
                    // 设置为您在云消息队列 RocketMQ 版控制台创建的Topic。
                    Message msg = new Message("YOUR TOPIC",
                            // 设置消息的Tag。
                            "YOUR MESSAGE TAG",
                            // 消息内容。
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // 发送延时消息,需要设置延时时间,单位毫秒(ms),消息将在指定延时时间后投递,例如消息将在3秒后投递。
                    long delayTime = System.currentTimeMillis() + 3000;
                    msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
    
                    // 若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2021-08-10 18:45:00投递。
                    // 定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。
                    // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                    // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                    System.out.println(new Date() + " Send mq message failed.");
                    e.printStackTrace();
                }
            }
    
            // 在应用退出前,销毁Producer对象。
            // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
            producer.shutdown();
        }
    }

    订阅定时/延时消息的示例代码和订阅普通消息相同。

    事务消息

    发送事务消息

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
             * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
             * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
             * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
             * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // 使用公网接入点时,需要配置RPCHook。
            // 您在消息队列RocketMQ版控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
            // 使用VPC接入点时,无需配置RPCHook。
            // 如果实例类型为Serverless实例,则必须配置RPCHook。
            // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");
    
            // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
            // 5.3.0版本及以上SDK开启消息轨迹除需设置AccessChannel外,需要增加,EnableTrace参数
            transactionMQProducer.setEnableTrace(true);
    
            // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
            transactionMQProducer.setTransactionListener(new TransactionListener() {
    
                @Override
                public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                    System.out.println("开始执行本地事务: " + msg);
                    return LocalTransactionState.UNKNOW;
                }
    
                @Override
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    System.out.println("收到事务消息的回查请求, MsgId: " + msg.getMsgId());
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    订阅事务消息的示例代码和订阅普通消息相同。

Serverless版实例公网访问版本说明

Serverless版实例使用公网访问接入云消息队列 RocketMQ 版时,需要保证使用的SDK版本满足以下要求,并在消息收发代码中补充如下内容:

说明

其中,InstanceId需要替换为您实际使用的实例ID。

  • SDK版本:rocketmq-client ≥ 5.2.0

    消息发送代码补充:producer.setNamespaceV2("InstanceId");

    消息消费代码补充:consumer.setNamespaceV2("InstanceId");

  • SDK版本:rocketmq-client-java ≥ 5.0.6

    消息发送和消息消费代码补充:

    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setNamespace("InstanceId")
    .setCredentialProvider(sessionCredentialsProvider)
    .build();

  • 本页导读 (1)
  • 示例代码
  • Serverless版实例公网访问版本说明
  • 相关文档