全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
全局事务服务 GTS

配置 MQ 加入事务

更新时间:2017-07-17 19:45:04

配置 MQ 加入事务,可以保证数据库操作与消息发送的一致性。

  • 数据库操作提交,则消息一定发送成功。
  • 数据库操作回滚,则消息一定不会被发送出去。

本文档提供了两种将 MQ 加入事务的方法,详细步骤请参见 操作步骤 部分。

应用场景

有些系统在使用数据库保证系统内数据一致的同时, 也会使用消息队列(MQ) 作为和其他系统间的消息传递, 完成不同系统间的数据一致。

业务逻辑

A 转账给 B 了 10 次,前五次成功,后五次失败。

在 A 转账给 B 成功后,增加了 MQ 的一个通知,告知转账成功。

使用 GTS 事务保证了 A 和 B 钱的总数始终不变,同时保证了只有转账成功的情况下 MQ 通知才可见。

前提条件

  1. 已申请了事务分组,并申请了一个 MQ 服务。

  2. 已准备好两台 RDS 和一台 ECS。 ECS 用于部署本应用。

  3. 在两个 RDS 实例中分别执行 txc_sample1.sql 、 txc_sample2.sql 和 txc_undo_log.sql 完成建表。

操作步骤

GTS 提供了两种方法将 MQ 加入事务。

通过代码将 MQ 加入事务

  1. 在配置文档 xml 中配置好 MQ 资源,例如:

    1. <bean class="com.ta obao.txc.client.aop.MTRelationShipManager">
    2. <property name="beanNames" ref="mtServicesClassList" />
    3. <property name="interceptorNames">
    4. <list>
    5. <value>mtBranchInterceptor</value>
    6. </list>
    7. </property>
    8. <property name="order" value="1"></property>
    9. <property name="proxyTargetClass" value="false"></property>
    10. </bean>
    11. <bean id="mtBranchInterceptor" class="com.taobao.txc.resourcemanager.mt.MtBranchInterceptor"/>
    12. <bean id="mtServicesClassList" class="org.springframework.beans.factory.config.ListFactoryBean">
    13. <property name="sourceList">
    14. <list>
    15. <value>com.taobao.txc.rm.mq.TxcMQProducer</value>
    16. </list>
    17. </property>
    18. </bean>
    19. <bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl">
    20. <constructor-arg name="ProducerId" value="PID_txc_mq_prod_test"/>
    21. <constructor-arg name="AccessKey" value="XXX"/>
    22. <constructor-arg name="SecretKey" value="XXX"/>
    23. </bean>
  2. 在代码中启动一个 MQ 资源,例如:

    1. TxcMQProducer txcMQProducer = (TxcMQProducer) context.getBean("txc_mq_producer");
    2. txcMQProducer.start();
    3. System.out.println("Producer started!");
  3. 要实现 MQ 事务,需要在 GTS 注解函数范围内发用一个 MQ 消息,例如:

    1. @TxcTransaction(appName = "myapp")
    2. public void update(Connection connection1, Connection connection2) throws SQLException, MQClientException {
    3. //数据源操作
    4. update1(connection1);
    5. update2(connection2);
    6. //创建一个消息
    7. Message msg = new Message(/*topic*/"txc_mq_test",
    8. /*tag*/"onePay",
    9. String.format("onePay message:success\n").getBytes());
    10. /* 发送一个消息,事务完成消息可见 */
    11. SendResult sendResult = txcMQProducer.send(null, 0, msg);
    12. }
  4. MQ 的 consumer 工程的配置与使用可以参考下面这个例子实现:

    1. Properties properties = new Properties();
    2. properties.put(PropertyKeyConst.ConsumerId, "CID_txc_mq_prod_test");
    3. properties.put(PropertyKeyConst.AccessKey, "XXX");
    4. properties.put(PropertyKeyConst.SecretKey, "XXX");
    5. Consumer consumer = ONSFactory.createConsumer(properties);
    6. consumer.subscribe("txc_mq_test", "*", new MessageListener() {
    7. @Override
    8. public Action consume(Message message, ConsumeContext context) {
    9. log.info(String.format("SMSListener got message:%s", message));
    10. System.out.println(String.format("SMSListener got message:%s", message));
    11. System.out.println(new String(message.getBody()));
    12. return Action.CommitMessage;
    13. }
    14. });
    15. consumer.start();

通过样例将 MQ 加入事务

  1. 下载样例工程,解压,得到 sample-txc-mq 文件夹。

  2. 把该工程文件夹到 ECS 服务器上,找到 sample-txc-mq/src/main/resources 目录下的 txc-mq-client-context.xml 文件,对该文件中的两个数据源进行修改,分别替换为两个 RDS 的数据源。

  3. 将该文件中 <constructor-arg value="xxxxx"/> 的 xxxxx 替换为在步骤 1 中申请的 GTS 组 id。

  4. 将自己申请的 MQ 相关信息配置到该文件中,即将 <bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl"> 下的参数替换为自己的 MQ 配置。

  5. 将该文件中 <property name=" accessKey" value="xxxxx"/><property name=" secretKey" value="xxxxx"/> 的 xxxxx 分别替换为您阿里云账户的 Access Key ID 和 Access Key Secret。

  6. 在 sample-txc-mq 目录下执行 build.sh 编译本工程。编译完成后在 sample-txc-mq/client/bin 目录下执行 run.sh 可以看到 MQ 的 provider 运行结果。

  7. 将 sample-mq-consumer 工程拷贝到 ECS 服务器中,在 sample-mq-consumer/src/main/java/com/taobao/txc/tests 中找到 SMSListener.java,修改其中的 xxxxx 为申请的 MQ 配置。在 sample-mq-consumer 目录下执行 build.sh 编译该工程,编译完成后 在sample-mq-consumer/client/bin 目录下执行 run.sh 可以 consumer 掉刚刚 sample-txc-mq 工程生产出来的 MQ 消息。

  8. sample-txc-mq 工程的 Java 源代码在 /sample-txc-mq/src/main/java/com/taobao/txc/tests 目录下,可以根据业务需求修改。

注意事项

  1. 在部署 MQ 下的 GTS 应用时,需要在代码中配置 MQ 资源,包括 ProducerId、consumerId 和AccessKeys。

    如果您还没有 MQ 资源,需要申请一个 MQ 资源,MQ 资源申请参考 步骤二:申请资源

  2. com.taobao.txc.client.aop.MTRelationShipManager 一定要先于com.taobao.txc.client.aop.TxcTransactionScaner 进行声明。

结果验证

通过上述 GTS 配置可以将 MQ 消息和多个数据源加入一个事务中,MQ 消息和多个数据源会保持强一致,一旦有异常返回给带 @TxcTransaction 注解的方法,都会导致这个全局事务数据回滚到之前的状态,同时 MQ 消息也不会发送出去。

本文导读目录