全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网

配置 MQ 加入事务

更新时间:2017-07-17 19:45:04

配置 MQ 加入事务,可以保证数据库操作与消息发送的一致性。

  • 数据库操作提交,则消息一定发送成功。
  • 数据库操作回滚,则消息一定不会被发送出去。

本文档提供了两种将 MQ 加入事务的方法,详细步骤请参见 操作步骤 部分。

应用场景

有些系统在使用数据库保证系统内数据一致的同时, 也会使用消息队列(MQ) 作为和其他系统间的消息传递, 完成不同系统间的数据一致。

业务逻辑

A 转账给 B 了 10 次,前五次成功,后五次失败。

在 A 转账给 B 成功后,增加了 MQ 的一个通知,告知转账成功。

使用 GTS 事务保证了 A 和 B 钱的总数始终不变,同时保证了只有转账成功的情况下 MQ 通知才可见。

前提条件

  1. 已申请了事务分组,并申请了一个 MQ 服务。

  2. 已准备好两台 RDS 和一台 ECS。 ECS 用于部署本应用。

  3. 在两个 RDS 实例中分别执行 txc_sample1.sql 、 txc_sample2.sql 和 txc_undo_log.sql 完成建表。

操作步骤

GTS 提供了两种方法将 MQ 加入事务。

通过代码将 MQ 加入事务

  1. 在配置文档 xml 中配置好 MQ 资源,例如:

    1. <bean class="com.ta obao.txc.client.aop.MTRelationShipManager">
    2. <property name="beanNames" ref="mtServicesClassList" />
    3. <property name="interceptorNames">
    4. <list>
    5. <value>mtBranchInterceptor</value>
    6. </list>
    7. </property>
    8. <property name="order" value="1"></property>
    9. <property name="proxyTargetClass" value="false"></property>
    10. </bean>
    11. <bean id="mtBranchInterceptor" class="com.taobao.txc.resourcemanager.mt.MtBranchInterceptor"/>
    12. <bean id="mtServicesClassList" class="org.springframework.beans.factory.config.ListFactoryBean">
    13. <property name="sourceList">
    14. <list>
    15. <value>com.taobao.txc.rm.mq.TxcMQProducer</value>
    16. </list>
    17. </property>
    18. </bean>
    19. <bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl">
    20. <constructor-arg name="ProducerId" value="PID_txc_mq_prod_test"/>
    21. <constructor-arg name="AccessKey" value="XXX"/>
    22. <constructor-arg name="SecretKey" value="XXX"/>
    23. </bean>
  2. 在代码中启动一个 MQ 资源,例如:

    1. TxcMQProducer txcMQProducer = (TxcMQProducer) context.getBean("txc_mq_producer");
    2. txcMQProducer.start();
    3. System.out.println("Producer started!");
  3. 要实现 MQ 事务,需要在 GTS 注解函数范围内发用一个 MQ 消息,例如:

    1. @TxcTransaction(appName = "myapp")
    2. public void update(Connection connection1, Connection connection2) throws SQLException, MQClientException {
    3. //数据源操作
    4. update1(connection1);
    5. update2(connection2);
    6. //创建一个消息
    7. Message msg = new Message(/*topic*/"txc_mq_test",
    8. /*tag*/"onePay",
    9. String.format("onePay message:success\n").getBytes());
    10. /* 发送一个消息,事务完成消息可见 */
    11. SendResult sendResult = txcMQProducer.send(null, 0, msg);
    12. }
  4. MQ 的 consumer 工程的配置与使用可以参考下面这个例子实现:

    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.ConsumerId, "CID_txc_mq_prod_test");
    3. properties.put(PropertyKeyConst.AccessKey, "XXX");
    4. properties.put(PropertyKeyConst.SecretKey, "XXX");
    5. Consumer consumer = ONSFactory.createConsumer(properties);
    6. consumer.subscribe("txc_mq_test", "*", new MessageListener() {
    7. @Override
    8. public Action consume(Message message, ConsumeContext context) {
    9. log.info(String.format("SMSListener got message:%s", message));
    10. System.out.println(String.format("SMSListener got message:%s", message));
    11. System.out.println(new String(message.getBody()));
    12. return Action.CommitMessage;
    13. }
    14. });
    15. consumer.start();

通过样例将 MQ 加入事务

  1. 下载样例工程,解压,得到 sample-txc-mq 文件夹。

  2. 把该工程文件夹到 ECS 服务器上,找到 sample-txc-mq/src/main/resources 目录下的 txc-mq-client-context.xml 文件,对该文件中的两个数据源进行修改,分别替换为两个 RDS 的数据源。

  3. 将该文件中 <constructor-arg value="xxxxx"/> 的 xxxxx 替换为在步骤 1 中申请的 GTS 组 id。

  4. 将自己申请的 MQ 相关信息配置到该文件中,即将 <bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl"> 下的参数替换为自己的 MQ 配置。

  5. 将该文件中 <property name=" accessKey" value="xxxxx"/><property name=" secretKey" value="xxxxx"/> 的 xxxxx 分别替换为您阿里云账户的 Access Key ID 和 Access Key Secret。

  6. 在 sample-txc-mq 目录下执行 build.sh 编译本工程。编译完成后在 sample-txc-mq/client/bin 目录下执行 run.sh 可以看到 MQ 的 provider 运行结果。

  7. 将 sample-mq-consumer 工程拷贝到 ECS 服务器中,在 sample-mq-consumer/src/main/java/com/taobao/txc/tests 中找到 SMSListener.java,修改其中的 xxxxx 为申请的 MQ 配置。在 sample-mq-consumer 目录下执行 build.sh 编译该工程,编译完成后 在sample-mq-consumer/client/bin 目录下执行 run.sh 可以 consumer 掉刚刚 sample-txc-mq 工程生产出来的 MQ 消息。

  8. sample-txc-mq 工程的 Java 源代码在 /sample-txc-mq/src/main/java/com/taobao/txc/tests 目录下,可以根据业务需求修改。

注意事项

  1. 在部署 MQ 下的 GTS 应用时,需要在代码中配置 MQ 资源,包括 ProducerId、consumerId 和AccessKeys。

    如果您还没有 MQ 资源,需要申请一个 MQ 资源,MQ 资源申请参考 步骤二:申请资源

  2. com.taobao.txc.client.aop.MTRelationShipManager 一定要先于com.taobao.txc.client.aop.TxcTransactionScaner 进行声明。

结果验证

通过上述 GTS 配置可以将 MQ 消息和多个数据源加入一个事务中,MQ 消息和多个数据源会保持强一致,一旦有异常返回给带 @TxcTransaction 注解的方法,都会导致这个全局事务数据回滚到之前的状态,同时 MQ 消息也不会发送出去。

本文导读目录