本文提供使用 TCP 协议下的开源 Go SDK 来收发事务消息的示例代码供您参考。

阿里云 RocketMQ 提供类似 X/Open XA 的分布式事务功能,通过阿里云 RocketMQ 事务消息,能达到分布式事务的最终一致。

交互流程

事务消息交互流程如下图所示。

process

详情请参见事务消息

前提条件

您已完成准备工作。详情请参见准备工作

发送事务消息

发送事务消息的示例代码如下。
package main

import (
    "fmt"
    "github.com/apache/rocketmq-client-go/core"
    "time"
)

func main() {
    pConfig := &rocketmq.ProducerConfig{
        ClientConfig: rocketmq.ClientConfig{
            //您在阿里云 RocketMQ 的控制台上申请的 GID。
            GroupID: "GID_XXX",
            //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取。
            NameServer: "http://XXXXXX:80",
            Credentials: &rocketmq.SessionCredentials{
                //您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。
                AccessKey: "XXX",
                //您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。
                SecretKey: "XXX",
                //用户渠道,默认值为:ALIYUN。
                Channel: "ALIYUN",
            },
        },
        //主动设置该实例用于发送事务消息。
        ProducerModel: rocketmq.TransProducer,
    }
    sendTransactionMessage(pConfig)
}

type MyTransactionLocalListener struct {
}
// 执行本地事务。
func (l *MyTransactionLocalListener) Execute(m *rocketmq.Message, arg interface{}) rocketmq.TransactionStatus {
    fmt.Printf("Execute local:Topic %s ,Body: %s, return UnknownTransaction\n", m.Topic, m.Body)
    // 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询)。
    // 执行本地事务。

    // 本地事务成功则提交消息,rocketmq.CommitTransaction。
    // 本地事务失败则回滚消息,rocketmq.RollbackTransaction。
    return rocketmq.CommitTransaction
}
//回查本地事务执行结果。
func (l *MyTransactionLocalListener) Check(m *rocketmq.MessageExt, arg interface{}) rocketmq.TransactionStatus {
    fmt.Printf("Check local:MessageID: %s ,Body: %s return CommitTransaction\n", m.MessageID, m.Body)
    // 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询)。
    // 执行本地事务的结果回查。

    // 本地事务成功则提交消息,rocketmq.CommitTransaction。
    // 本地事务失败则回滚消息,rocketmq.RollbackTransaction。
    return rocketmq.CommitTransaction
}

func sendTransactionMessage(config *rocketmq.ProducerConfig) {
    listener := &MyTransactionLocalListener{}
    // 开源版本上云时不支持自定义数据参数,请将第三个参数设置为 nil。
    producer, err := rocketmq.NewTransactionProducer(config, listener, nil)

    if err != nil {
        fmt.Println("create Transaction producer failed, error:", err)
        return
    }

    err = producer.Start()
    if err != nil {
        fmt.Println("start Transaction producer error", err)
        return
    }
    defer producer.Shutdown()

    fmt.Printf("Transaction producer: %s started... \n", producer)
    for i := 0; i < 10; i++ {
        key := time.Now().String()
        msg := fmt.Sprintf("%s-[%s]--%d", "Hello,Transaction MQ Message-", key, i)
        // 发送消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic。
        // 开源版本上云时不支持自定义数据参数,请将第二个参数设置为 nil。
        result, err := producer.SendMessageTransaction(&rocketmq.Message{Topic: "Your transaction topic", Body: msg}, nil)
        if err != nil {
            fmt.Println("Error:", err)
        }
        fmt.Printf("send message: %s result: %s\n", msg, result)
    }
    time.Sleep(time.Duration(1) * time.Minute)
    fmt.Println("shutdown Transaction producer.")
}

事务回查机制说明:

  • 发送事务消息为什么必须要实现回查 Check 机制?

    当半事务消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。

  • Check 被回调时,业务逻辑都需要做些什么?

    事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。阿里云 RocketMQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:

    1. 检查该半事务消息对应的本地事务的状态(committed or rollback)。
    2. 向 Broker 提交该半事务消息本地事务的状态。

消费事务消息

消费事务消息的实例代码如下。

package main
import (
  "fmt"
  "github.com/apache/rocketmq-client-go/core"
  "sync/atomic"
)
func main() {
  pConfig := &rocketmq.PushConsumerConfig{
    ClientConfig: rocketmq.ClientConfig{
      //您在阿里云 RocketMQ 控制台上申请的 GID。
      GroupID:    "GID_XXXXXXXXXXXX",
      //设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取。
      NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
      Credentials: &rocketmq.SessionCredentials{
        //您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证。
        AccessKey: "Your Access Key",
        //您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证。
        SecretKey: "Your Secret Key",
        //用户渠道,默认值为:ALIYUN。
        Channel:   "ALIYUN",
      },
    },
    //设置使用集群模式。
    Model:         rocketmq.Clustering,
    //设置该消费者为普通消息消费。
    ConsumerModel: rocketmq.CoCurrently,
  }
  ConsumeWithPush(pConfig)
}
func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
  consumer, err := rocketmq.NewPushConsumer(config)
  if err != nil {
    println("create Consumer failed, error:", err)
    return
  }
  ch := make(chan interface{})
  var count = (int64)(1000000)
  // ********************************************
  // 1. 确保订阅关系的设置在启动之前完成。
  // 2. 确保相同 GID 下面的消费者的订阅关系一致。
  // *********************************************
  consumer.Subscribe("YourTopicXXXXXXXX", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
    fmt.Printf("A message received, MessageID:%s, Body:%s \n", msg.MessageID, msg.Body)
    if atomic.AddInt64(&count, -1) <= 0 {
      ch <- "quit"
    }
    //消费成功回复 ConsumeSuccess,消费失败回复 ReConsumeLater。此时会触发消费重试。
    return rocketmq.ConsumeSuccess
  })
  err = consumer.Start()
  if err != nil {
    println("consumer start failed,", err)
    return
  }
  fmt.Printf("consumer: %s started...\n", consumer)
  <-ch
  //请保持消费者一直处于运行状态。
  err = consumer.Shutdown()
  if err != nil {
    println("consumer shutdown failed")
    return
  }
  println("consumer has shutdown.")
}