文档

示例代码

更新时间:
一键部署

云消息队列 RocketMQ 版5.x版本实例可兼容4.x/3.x SDK客户端接入,您可以使用4.x/3.x版本的SDK接入5.x实例进行消息收发。本文为您介绍4.x/3.x版本下的Java SDK消息收发示例代码。

重要
  • 推荐您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作为主力研发版本,和云消息队列 RocketMQ 版5.x服务端完全兼容,提供了更全面的功能并支持更多增强特性。更多信息,请参见5.x系列SDK
  • RocketMQ 4.x/3.x系列SDK和ONS系列SDK后续仅做功能维护,建议仅存量业务使用。

普通消息收发示例

发送普通消息(同步发送)

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
    /**
    * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公网接入点时,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC接入点时,无需配置RPCHook。
        // 如果实例类型为Serverlesss实例,则必须配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();
        
        // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
        producer.setAccessChannel(AccessChannel.CLOUD);
        
        // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                //消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在应用退出前,销毁Producer对象。
        // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
        producer.shutdown();
    }
}

发送普通消息(异步发送)

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQAsyncProducer {
    /**
    * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公网接入点时,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC接入点时,无需配置RPCHook。
        // 如果实例类型为Serverlesss实例,则必须配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();
        
        // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
        producer.setAccessChannel(AccessChannel.CLOUD);

        // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override public void onSuccess(SendResult result) {
                        // 消息发送成功。
                        System.out.println("send message success. msgId= " + result.getMsgId());
                    }

                    @Override public void onException(Throwable throwable) {
                        // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                        System.out.println("send message failed.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在应用退出前,销毁Producer对象。
        // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
        producer.shutdown();
    }
}

发送普通消息(单向发送)

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQOnewayProducer {
    /**
    * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公网接入点时,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC接入点时,无需配置RPCHook。
        // 如果实例类型为Serverlesss实例,则必须配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();
        
        // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
        producer.setAccessChannel(AccessChannel.CLOUD);
        
        // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                    "YOUR MESSAGE TAG",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            } catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在应用退出前,销毁Producer对象。
        // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
        producer.shutdown();
    }
}

订阅普通消息

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    /**
    * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }
  public static void main(String[] args) throws MQClientException {
        // 使用公网接入点时,需要配置RPCHook。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // 使用VPC接入点时,无需配置RPCHook。
        // 如果实例类型为Serverlesss实例,则必须配置RPCHook。
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        
        // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
    consumer.setNamesrvAddr("YOUR ACCESS POINT");
        // 设置为您在阿里云云消息队列 RocketMQ 版控制台上创建的Topic。
    consumer.subscribe("YOUR TOPIC", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("Receive New Messages: %s %n", msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    consumer.start();
  }
}

顺序消息收发示例

发送顺序消息

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOrderProducer { 
    /**
    * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
    */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公网接入点时,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC接入点时,无需配置RPCHook。
        // 如果实例类型为Serverlesss实例,则必须配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();
        
        // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
        producer.setAccessChannel(AccessChannel.CLOUD);

        // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
    producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("YOUR ORDER TOPIC",
                    "YOUR MESSAGE TAG",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 注意!!!请务必设置该配置项,顺序消息才能均匀分布到各队列中。
                msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 选择适合自己的分区选择算法,保证同一个参数得到的结果相同。
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

订阅顺序消息

import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() {
        /**
        * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
        * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
        * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
        * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
        */
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {

        // 使用公网接入点时,需要配置RPCHook。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // 使用VPC接入点时,无需配置RPCHook。
        // 如果实例类型为Serverlesss实例,则必须配置RPCHook。
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        
        // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
    consumer.setNamesrvAddr("YOUR ACCESS POINT");
        consumer.subscribe("YOUR ORDER TOPIC", "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true); 
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 消费失败则挂起重试返回:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

定时/延时消息示例代码

发送定时/延时消息

import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
    /**
    * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
    */   
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公网接入点时,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC接入点时,无需配置RPCHook。
        // 如果实例类型为Serverlesss实例,则必须配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();
        
        // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
        producer.setAccessChannel(AccessChannel.CLOUD);

        // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try { 
                // 设置为您在云消息队列 RocketMQ 版控制台创建的Topic。
                Message msg = new Message("YOUR TOPIC",
                    // 设置消息的Tag。
                    "YOUR MESSAGE TAG",
                    // 消息内容。
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送延时消息,需要设置延时时间,单位毫秒(ms),消息将在指定延时时间后投递,例如消息将在3秒后投递。
                longdelayTime=System.currentTimeMillis()+3000;
                msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(delayTime));
      
                // 若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2021-08-10 18:45:00投递。
                // 定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。
                // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在应用退出前,销毁Producer对象。
        // 注意:销毁Producer对象可以节约系统内存,若您需要频繁发送消息,则无需销毁Producer对象。
        producer.shutdown();
    }
}

订阅定时/延时消息

订阅定时/延时消息的示例代码和订阅普通消息相同,请参见订阅普通消息

事务消息示例代码

发送事务消息

  1. 发送半事务消息(Half Message)及执行本地事务,示例代码如下。

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
            * 如果是使用公网接入点访问,则必须设置RpcHook,里面填写实例的用户名和密码。实例用户名和密码在控制台实例详情页面获取。
            * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
            * 如果是在阿里云ECS内网访问,无需初始化RpcHook,服务端会根据内网VPC信息智能获取。
            * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
            */     
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // 使用公网接入点时,需要配置RPCHook。
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer(getAclRPCHook());
            // 使用VPC接入点时,无需配置RPCHook。
            // 如果实例类型为Serverlesss实例,则必须配置RPCHook。
            // TransactionMQProducer transactionMQProducer = new TransactionMQProducer();
            
            // 设置接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项;如果不开启消息轨迹功能,则不需要运行此项。
            TransactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
    
            // 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
      transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
            transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                        @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                            System.out.println("开始执行本地事务: " + msg);
                            return LocalTransactionState.UNKNOW;
                        }
                    }, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  2. 提交事务消息状态,示例代码如下。

    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionCheckListener;
    import org.apache.rocketmq.common.message.MessageExt;
    
    // 云消息队列 RocketMQ 版发送事务消息本地Check接口实现类。
    public class LocalTransactionCheckerImpl implements TransactionCheckListener {
        // 本地事务Checker。更多信息,请参见事务消息。
        @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("收到事务消息的回查请求, MsgId: " + msg.getMsgId());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

订阅事务消息

订阅事务消息的示例代码和订阅普通消息一样,请参见订阅普通消息

  • 本页导读 (1)
文档反馈