本文提供使用 TCP 协议下的开源 Java SDK 收发事务消息的示例代码供您参考。

交互流程

事务消息交互流程如下图所示。

process

详情请参见事务消息

前提条件

您已完成以下操作:

发送事务消息

发送事务消息包含以下两个步骤:
  1. 发送半事务消息(Half Message)及执行本地事务,示例代码如下。

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        //设置为您在阿里云 RocketMQ 控制台上创建的 GID, 以及替换为您阿里云账号的 AccessKeyId 和 AccessKeySecret
        private static RPCHook getAclRPCHook() {
            return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
        }
    
        public static void main(String[] args) throws MQClientException {
            /**
             * 创建事务消息 Producer
             */
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
            /**
             * 设置为您从阿里云控制台获取的接入点信息,类似 “http://MQ_INST_XXXX.aliyuncs.com:80”
             */
            consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
            transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                        @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                            System.out.println("开始执行本地事务: " + msg);
                            return LocalTransactionState.UNKNOW;
                        }
                    }, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  2. 提交事务消息状态

    当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:

    • 执行本地事务完成后提交。
    • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。

    事务状态有以下三种:

    • LocalTransactionState.COMMIT_MESSAGE:提交事务,允许订阅方消费该消息。
    • LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
    • LocalTransactionState.UNKNOW:无法判断状态,期待阿里云 RocketMQ 的 Broker 向发送方再次询问该消息对应的本地事务的状态。
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionCheckListener;
    import org.apache.rocketmq.common.message.MessageExt;
    
    /**
     * RocketMQ 发送事务消息本地 Check 接口实现类
     */
    public class LocalTransactionCheckerImpl implements TransactionCheckListener {
        /**
         * 本地事务 Checker,详见: https://help.aliyun.com/document_detail/29548.html?spm=5176.doc35104.6.133.pJkthu
         */
        @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("收到事务消息的回查请求, MsgId: " + msg.getMsgId());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

事务回查机制说明

  • 发送事务消息为什么必须要实现回查 Check 机制?

    当步骤 1 中半事务消息发送完成,但本地事务返回状态为 LocalTransactionState.UNKNOW,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。

  • Check 被回调时,业务逻辑都需要做些什么?

    事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。消息队列 RocketMQ 版发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:

    1. 检查该半事务消息对应的本地事务的状态(committed or rollback)。
    2. 向 Broker 提交该半事务消息本地事务的状态。

订阅事务消息

事务消息的订阅与普通消息订阅一致,如下所示。
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

public class RocketMQPushConsumer {
    //设置为您在阿里云 RocketMQ 控制台上创建的 GID, 以及替换为您的 AccessKeyId 和 AccessKeySecret
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("YOUR ACCESS KEY", "YOUR SECRET KEY"));
    }
    public static void main(String[] args) throws MQClientException {
        //设置为您在阿里云 RocketMQ 控制台上创建的 GID, 以及替换为您阿里云账号的 AccessKeyId 和 AccessKeySecret
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
        //设置为阿里云 RocketMQ 实例的接入点
        consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
        //阿里云上消息轨迹需要设置为 CLOUD 方式
        consumer.setAccessChannel(AccessChannel.CLOUD);
        // 设置为您在阿里云 RocketMQ 控制台上创建的 Topic
        consumer.subscribe("YOUR TOPIC", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}