本文提供使用TCP协议下的社区版Java SDK收发事务消息的示例代码供您参考。
交互流程
事务消息交互流程如下图所示。
更多信息,请参见事务消息。
前提条件
您已完成以下操作:
下载4.5.2或以上版本的社区版Java SDK。
准备工作。
获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey。
发送事务消息
发送事务消息包含以下两个步骤:
发送半事务消息(Half Message)及执行本地事务,示例代码如下。
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class RocketMQTransactionProducer { private static RPCHook getAclRPCHook() { /** * 替换为您阿里云账号的AccessKey ID和AccessKey Secret。 * 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。 */ return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))); } public static void main(String[] args) throws MQClientException { /** * 创建事务消息Producer,并开启消息轨迹。设置为您在消息队列RocketMQ版控制台创建的Group ID。 * 如果不想开启消息轨迹,可以按照如下方式创建: * TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook()); */ TransactionMQProducer transactionMQProducer = new TransactionMQProducer(null, "YOUR TRANSACTION GROUP ID", getAclRPCHook(), true, null); /** * 设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”。 */ transactionMQProducer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80"); //阿里云上消息轨迹需要设置为CLOUD方式,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。 transactionMQProducer.setAccessChannel(AccessChannel.CLOUD); transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl()); transactionMQProducer.start(); for (int i = 0; i < 10; i++) { try { Message message = new Message("YOUR TRANSACTION TOPIC", "YOUR MESSAGE TAG", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() { @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { System.out.println("开始执行本地事务: " + msg); return LocalTransactionState.UNKNOW; } }, null); assert sendResult != null; } catch (Exception e) { e.printStackTrace(); } } } }
提交事务消息状态
当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
执行本地事务完成后提交。
执行本地事务一直没提交状态,等待服务器回查消息的事务状态。
事务状态有以下三种:
LocalTransactionState.COMMIT_MESSAGE
:提交事务,允许订阅方消费该消息。LocalTransactionState.ROLLBACK_MESSAGE
:回滚事务,消息将被丢弃不允许消费。LocalTransactionState.UNKNOW
:无法判断状态,期待阿里云云消息队列 RocketMQ 版的Broker向发送方再次询问该消息对应的本地事务的状态。
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; /** * 消息队列RocketMQ版发送事务消息本地Check接口实现类。 */ public class LocalTransactionCheckerImpl implements TransactionCheckListener { /** * 本地事务Checker。更多信息,请参见事务消息。 */ @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("收到事务消息的回查请求, MsgId: " + msg.getMsgId()); return LocalTransactionState.COMMIT_MESSAGE; } }
事务回查机制说明
发送事务消息为什么必须要实现回查Check机制?
当步骤1中半事务消息发送完成,但本地事务返回状态为
LocalTransactionState.UNKNOW
,或者应用退出导致本地事务未提交任何状态时,从Broker的角度看,这条Half状态的消息的状态是未知的。因此Broker会定期要求发送方能Check该Half状态消息,并上报其最终状态。Check被回调时,业务逻辑都需要做些什么?
事务消息的Check方法里面,应该写一些检查事务一致性的逻辑。云消息队列 RocketMQ 版发送事务消息时需要实现
LocalTransactionChecker
接口,用来处理Broker主动发起的本地事务状态回查请求;因此在事务消息的Check方法中,需要完成两件事情:检查该半事务消息对应的本地事务的状态(committed or rollback)。
向Broker提交该半事务消息本地事务的状态。
订阅事务消息
事务消息的订阅与普通消息订阅一致,如下所示。
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQPushConsumer {
/**
* 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
* 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* 创建Consumer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。
* 如果不想开启消息轨迹,可以按照如下方式创建:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely());
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
//设置为阿里云消息队列RocketMQ版实例的接入点。
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
//阿里云上消息轨迹需要设置为CLOUD方式,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
consumer.setAccessChannel(AccessChannel.CLOUD);
// 设置为您在阿里云消息队列RocketMQ版控制台上创建的Topic。
consumer.subscribe("YOUR TOPIC", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}