本文介绍如何使用消息服务MNS的延时消息功能,实现本地操作和消息发送的事务一致性。
前提条件
您已创建以下队列,具体操作,请参见创建队列。
- 事务消息队列
消息存活时间小于消息延时时间。
- 当消息发送成功、事务操作成功时,生产者修改消息延迟时间,消息对消费者可见。
- 当消息发送成功、事务操作失败时,生产者不修改消息延迟时间,消息对消费者不可见。
- 操作日志队列
记录事务消息的操作记录信息。消息延时时间为事务消息操作超时时间。日志队列中的消息确认后将对消费者不可见。
原理介绍
一些业务场景需要保证本地操作和消息发送的事务一致性,即消息发送成功,本地操作成功。如果消息发送成功,本地操作失败,那么发送成功的消息需要回滚。操作流程如下图所示。
消息发送成功,事务操作成功时操作步骤如下所示:
- 生产者发送一条事务准备消息到事务消息队列。
- 生产者发送操作日志消息到操作日志队列,日志中包含步骤1消息的消息句柄。
- 生产者执行本地事务操作成功。
- 生产者请求修改消息延迟时间,使消息对消费者可见。
- 生产者向操作日志队列确认操作日志,删除日志消息。
- 消费者从事务消息队列中接收事务消息。
- 消费者处理事务消息。
- 消费者请求删除事务消息。
消息发送成功,事务操作失败时操作步骤如下所示:
- 生产者发送一条事务准备消息到事务消息队列。
- 生产者发送操作日志信息到操作日志队列,日志中包含步骤1消息的消息句柄。
- 生产者执行本地事务操作失败。
- (对应上图中步骤A)操作日志队列向生产者发送消息,请求读取超时未确认操作日志。
- (对应上图中步骤B)生产者检查事务结果,发现操作失败。
- (对应上图中步骤C)生产者提交回滚消息请求,不修改消息延迟时间,消息对消费者不可见。
- (对应上图中步骤D)生产者向操作日志队列确认操作日志,删除日志消息。
示例代码
消息服务MNS最新的Java SDK(1.1.8)中的TransactionQueue支持上述事务消息方案。在TransactionOperations和TransactionChecker两个接口添加业务操作和检查逻辑,您就可以方便地实现事务消息。
示例代码如下:
public class TransactionMessageDemo {
public class MyTransactionChecker implements TransactionChecker {
@Override
public boolean checkTransactionStatus(Message message) {
boolean checkResult = false;
String messageHandler = message.getReceiptHandle();
try {
//检查messageHandler事务是否成功。
checkResult = true;
} catch (Exception e) {
checkResult = false;
}
return checkResult;
}
}
public class MyTransactionOperations implements TransactionOperations {
@Override
public boolean doTransaction(Message message) {
boolean transactionResult = false;
String messageHandler = message.getReceiptHandle();
String messageBody = message.getMessageBody();
try {
//根据messageHandler和messageBody执行本地事务。
transactionResult = true;
} catch (Exception e) {
transactionResult = false;
}
return transactionResult;
}
}
public static void main(String[] args) {
System.out.println("Start TransactionMessageDemo");
String transQueueName = "transQueueName";
String accessKeyId = ServiceSettings.getMNSAccessKeyId();
String accessKeySecret = ServiceSettings.getMNSAccessKeySecret();
String endpoint = ServiceSettings.getMNSAccountEndpoint();
CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, endpoint);
MNSClient client = account.getMNSClient(); //初始化客户端。
//为事务队列创建队列。
QueueMeta queueMeta = new QueueMeta();
queueMeta.setQueueName(transQueueName);
queueMeta.setPollingWaitSeconds(15);
TransactionMessageDemo demo = new TransactionMessageDemo();
TransactionChecker transChecker = demo.new MyTransactionChecker();
TransactionOperations transOperations = demo.new MyTransactionOperations();
TransactionQueue transQueue = client.createTransQueue(queueMeta, transChecker);
//执行事务。
Message msg = new Message();
String messageBody = "TransactionMessageDemo";
msg.setMessageBody(messageBody);
transQueue.sendTransMessage(msg, transOperations);
//如果不使用队列,请删除队列并关闭客户端。
transQueue.delete();
//关闭客户端。
client.close();
System.out.println("End TransactionMessageDemo");
}
}
异常分析
- 生产者异常(例如进程重启)
- 读取操作日志队列中超时未确认日志。
- 检查事务结果。
- 如果检查到事务执行成功,则提交消息。
说明 重复提交不会对流程产生影响,同一句柄的消息只能成功提交一次。
- 确认操作日志。
- 消费者异常(例如进程重启)
消息服务MNS提供至少保证消费一次的特性,如果当前消费者没有成功消费并删除消息,消息在不可见时间后将继续可见,被当前消费者或者其他消费者处理。
- 消息服务MNS服务不可达(例如断网)
消息发送和接收处理状态及操作日志都在消息服务MNS服务端,消息服务MNS本身具备高可靠和高可用的特点,所以只要网络恢复正常,事务就可以继续进行。只要生产者操作成功,消费者就能收到消息并成功处理;如果生产者操作失败,则消费者无法收到消息。