本文提供使用 TCP 协议下的开源 Python SDK 来收发事务消息的示例代码供您参考。

阿里云 RocketMQ 提供类似 X/Open XA 的分布式事务功能,通过阿里云 RocketMQ 事务消息,能达到分布式事务的最终一致。

交互流程

事务消息交互流程如下图所示。

process

详情请参见事务消息

前提条件

您已完成准备工作。详情请参见准备工作

发送事务消息

发送事务消息的示例代码如下。

# -*- coding: utf-8 -*-

from rocketmq.client import Producer, Message, TransactionMQProducer, TransactionStatus
import time
# 发送消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic,确保是事务消息类型
topic = 'XXXXXX'
# 您在阿里云 RocketMQ 控制台上申请的 GID
gid = 'GID_XXXXX'
# 设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取
name_srv = 'http://XXXX.aliyuncs.com:80'
# 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证
ak = 'AK'
# 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证
sk = 'SK'
# 用户渠道,默认值为:ALIYUN
channel = 'ALIYUN'


def create_message():
    msg = Message(topic)
    msg.set_keys('YourKey')
    msg.set_tags('YourTags')
    msg.set_body('Hello RocketMQ, This is a Python Transaction Message.')
    return msg


def check_callback(msg):
    # 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询)
    # 执行本地事务
    # 本地事务成功则提交消息,TransactionStatus.COMMIT
    # 本地事务失败则回滚消息,TransactionStatus.ROLLBACK
    # Python3 默认的编码方式是 Unicode,Python3 下请选择正确的编解码方式
    # eg: print ('check: ' + msg.id.encode('utf-8").decode('utf-8'))
    print ('check: ' + msg.id.decode('utf-8'))
    return TransactionStatus.COMMIT


def local_execute(msg, user_args):
    # 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询)
    # 回查本地事务状态
    # 本地事务成功则提交消息,TransactionStatus.COMMIT
    # 本地事务失败则回滚消息,TransactionStatus.ROLLBACK
    # Python3 默认的编码方式是 Unicode,Python3 下请选择正确的编解码方式
    # eg: print ('local: ' + msg.id.encode('utf-8").decode('utf-8'))
    print ('local:   ' + msg.id.decode('utf-8'))
    return TransactionStatus.UNKNOWN


def send_transaction_message(count):
    # 发送事务消息必须初始化事务消息的 Producer
    producer = TransactionMQProducer(gid, check_callback)
    producer.set_name_server_address(name_srv)
    producer.set_session_credentials(ak, sk, channel)
    producer.start()
    for n in range(count):
        msg = create_message()
        ret = producer.send_message_in_transaction(msg, local_execute, None)
        print ('send message status: ' + str(ret.status) + ' msgId: ' + ret.msg_id)
    print ('send transaction message done')

    # 事务消息发送后会有异步回查的操作,请保持发送者一直处于运行状态
    while True:
        time.sleep(3600)
    producer.shutdown()


if __name__ == '__main__':
    send_transaction_message(10)
  • 发送事务消息为什么必须要实现回查 Check 机制?

    当半事务消息发送完成,但本地事务返回状态为 TransactionStatus.UNKNOWN,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。

  • Check 被回调时,业务逻辑都需要做些什么?

    事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。阿里云 RocketMQ 发送事务消息时需要实现 local_execute 接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:

    1. 检查该半事务消息对应的本地事务的状态(committed or rollback)。
    2. 向 Broker 提交该半事务消息本地事务的状态。

消费事务消息

消费事务消息的实例代码如下。

# -*- coding: utf-8 -*-

from rocketmq.client import PushConsumer, ConsumeStatus
import time

# 消费消息时请设置您在阿里云 RocketMQ 控制台上申请的 Topic
topic = 'XXXXXX'
# 您在阿里云 RocketMQ 控制台上申请的 GID
gid = 'GID_XXXXX'
# 设置 TCP 协议接入点,从阿里云 RocketMQ 控制台的实例详情页面获取
name_srv = 'http://XXXX.aliyuncs.com:80'
# 您在阿里云账号管理控制台中创建的 AccessKeyId,用于身份认证
ak = 'AK'
# 您在阿里云账号管理控制台中创建的 AccessKeySecret,用于身份认证
sk = 'SK'
# 用户渠道,默认值为:ALIYUN
channel = 'ALIYUN'


def callback(msg):
    print(msg.id, msg.body)
    # 消费成功回复 CONSUME_SUCCESS,消费失败回复 RECONSUME_LATER。此时会触发消费重试
    return ConsumeStatus.CONSUME_SUCCESS


def start_consume_message():
    consumer = PushConsumer(gid)
    consumer.set_name_server_address(name_srv)
    consumer.set_session_credentials(ak, sk, channel)
    consumer.subscribe(topic, callback)
    # ********************************************
    # 1. 确保订阅关系的设置在启动之前完成
    # 2. 确保相同 GID 下面的消费者的订阅关系一致
    # *********************************************
    print ('start consume message')
    consumer.start()

    # 请保持消费者一直处于运行状态
    while True:
        time.sleep(3600)


if __name__ == '__main__':
    start_consume_message()