全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网

Sample-txc-mq 样例工程

更新时间:2017-07-17 16:03:30

业务逻辑说明

本案例和 sample-txc-simple 案例类似,业务逻辑是 A 转账给 B 了 10 次,前五次成功,后五次失败。和 sample-txc-simple 案例不同的是,在 A 转账给 B 成功后,增加了 MQ 的一个通知,告知转账成功。使用 GTS 事务保证了 A 和 B 钱的总数始终不变,同时保证了只有转账成功的情况下 MQ 通知才可见。

前提条件

  1. 从 GTS 控制台申请事务分组,并申请一个 MQ 服务。

  2. 需要两台 RDS,一台 ECS 用于部署本应用。

  3. 在两个 RDS 实例中分别执行 txc_sample1.sql 、 txc_sample2.sql 和 txc_undo_log.sql 完成建表。

案例搭建方法

  1. 把该工程拷贝到 ECS 服务器上,找到 sample-txc-mq/src/main/resources 目录下的 txc-mq-client-context.xml 文件,对该文件中的两个数据源进行修改,分别替换为两个 RDS 的数据源。

  2. 将该文件中 <constructor-arg value="xxxxx"/> 的 xxxxx 替换为在步骤 1 中申请的 GTS 组 ID。

  3. 将自己申请的 MQ 相关信息配置到该文件中,即将 <bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl"> 下的参数替换为自己的 MQ 配置。

  4. 将该文件中 <property name=" accessKey" value="xxxxx"/><property name=" secretKey" value="xxxxx"/> 的 xxxxx 分别替换为您阿里云账户的 Access Key ID 和 Access Key Secret。

  5. 在 sample-txc-mq 目录下执行 build.sh 编译本工程。编译完成后在 sample-txc-mq/client/bin 目录下执行 run.sh 可以看到 MQ 的 provider 运行结果。

  6. 将 sample-mq-consumer 工程拷贝到 ECS 服务器中,在 sample-mq-consumer/src/main/java/com/taobao/txc/tests 中找到 SMSListener.java,修改其中的 xxxxx 为申请的 MQ 配置。在 sample-mq-consumer 目录下执行 build.sh 编译该工程,编译完成后 在sample-mq-consumer/client/bin 目录下执行 run.sh 可以 consumer 掉刚刚 sample-txc-mq 工程生产出来的 MQ 消息。

  7. sample-txc-mq 工程的 Java 源代码在 /sample-txc-mq/src/main/java/com/taobao/txc/tests 目录下,可以根据业务需求修改。

GTS + MQ 使用说明

  1. 在部署 MQ 下的 GTS 应用时,需要在代码中配置 MQ 资源,包括 ProducerId、consumerId 和AccessKeys,如下图所示。如果您还没有 MQ 资源,需要申请一个 MQ 资源,MQ 资源申请参考 MQ Quick Start

  2. MQ 的 producer 工程的配置与使用。

    1. 在配置文档 xml 中配置好 MQ 资源,例如:

      1. <bean class="com.ta obao.txc.client.aop.MTRelationShipManager">
      2. <property name="beanNames" ref="mtServicesClassList" />
      3. <property name="interceptorNames">
      4. <list>
      5. <value>mtBranchInterceptor</value>
      6. </list>
      7. </property>
      8. <property name="order" value="1"></property>
      9. <property name="proxyTargetClass" value="false"></property>
      10. </bean>
      11. <bean id="mtBranchInterceptor" class="com.taobao.txc.resourcemanager.mt.MtBranchInterceptor"/>
      12. <bean id="mtServicesClassList" class="org.springframework.beans.factory.config.ListFactoryBean">
      13. <property name="sourceList">
      14. <list>
      15. <value>com.taobao.txc.rm.mq.TxcMQProducer</value>
      16. </list>
      17. </property>
      18. </bean>
      19. <bean id="txc_mq_producer" class="com.taobao.txc.rm.mq.TxcMQProducerImpl">
      20. <constructor-arg name="ProducerId" value="PID_txc_mq_prod_test"/>
      21. <constructor-arg name="AccessKey" value="XXX"/>
      22. <constructor-arg name="SecretKey" value="XXX"/>
      23. </bean>
    2. 在代码中启动一个 MQ 资源,例如:

      1. TxcMQProducer txcMQProducer = (TxcMQProducer) context.getBean("txc_mq_producer");
      2. txcMQProducer.start();
      3. System.out.println("Producer started!");
    3. 要实现 MQ 事务,需要在 GTS 注解函数范围内发用一个 MQ 消息,例如:

      1. @TxcTransaction(appName = "myapp")
      2. public void update(Connection connection1, Connection connection2) throws SQLException, MQClientException {
      3. //数据源操作
      4. update1(connection1);
      5. update2(connection2);
      6. //创建一个消息
      7. Message msg = new Message(/*topic*/"txc_mq_test",
      8. /*tag*/"onePay",
      9. String.format("onePay message:success\n").getBytes());
      10. /* 发送一个消息,事务完成消息可见 */
      11. SendResult sendResult = txcMQProducer.send(null, 0, msg);
      12. }
    4. MQ 的 consumer 工程的配置与使用可以参考下面这个例子实现:

      1. Properties properties = new Properties();
      2. properties.put(PropertyKeyConst.ConsumerId, "CID_txc_mq_prod_test");
      3. properties.put(PropertyKeyConst.AccessKey, "XXX");
      4. properties.put(PropertyKeyConst.SecretKey, "XXX");
      5. Consumer consumer = ONSFactory.createConsumer(properties);
      6. consumer.subscribe("txc_mq_test", "*", new MessageListener() {
      7. @Override
      8. public Action consume(Message message, ConsumeContext context) {
      9. log.info(String.format("SMSListener got message:%s", message));
      10. System.out.println(String.format("SMSListener got message:%s", message));
      11. System.out.println(new String(message.getBody()));
      12. return Action.CommitMessage;
      13. }
      14. });
      15. consumer.start();
本文导读目录