文档

示例代码

更新时间:

云消息队列 RocketMQ 版5.x版本实例支持C++ 1.x SDK客户端接入,您可以使用1.x版本的SDK接入5.x实例进行消息收发。本文为您介绍1.x版本下的C++ SDK消息收发示例代码。

重要
  • 推荐您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作为主力研发版本,和云消息队列 RocketMQ 版5.x服务端完全兼容,提供了更全面的功能并支持更多增强特性。更多信息,请参见5.x系列SDK
  • RocketMQ 4.x/3.x系列SDK和ONS系列SDK后续仅做功能维护,建议仅存量业务使用。

普通消息收发示例

发送普通消息

#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()
{

    // 创建Producer,并配置发送消息所必需的信息。
    ONSFactoryProperty factoryInfo;  
    // 设置为您在消息队列RocketMQ版控制台创建的Group ID。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
    // 您在消息队列RocketMQ版控制台创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // 消息内容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    /**
    * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
    */  
    // 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

    // 创建Producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
    pProducer->start();

    Message msg(
            // Message Topic。
            factoryInfo.getPublishTopics(),
            // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。       
            "TagA",
            // Message Body,不能为空,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
            factoryInfo.getMessageContent()
    );

    // 设置代表消息的业务关键属性,请尽可能全局唯一。
    // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
    // 注意:不设置也不会影响消息正常收发。
    msg.setKey("ORDERID_100");

    // 发送消息,只要不抛出异常,就代表发送成功。     
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // 自定义处理exception的细节。
    }
    // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
    pProducer->shutdown();

    return 0;
}

订阅普通消息

#include "ONSFactory.h"

#include <iostream>
#include <thread>
#include <mutex>

using namespace ons;

std::mutex console_mtx;

class ExampleMessageListener : public MessageListener {
public:
    Action consume(Message& message, ConsumeContext& context) {
        // 此处为具体的消息处理过程,确认消息被处理成功请返回CommitMessage。
        // 如果有消费异常,或者期望重新消费,可以返回ReconsumeLater,消息将会在一段时间后重新投递。
        std::lock_guard<std::mutex> lk(console_mtx);
        std::cout << "Received a message. Topic: " << message.getTopic() << ", MsgId: "
        << message.getMsgID() << std::endl;
        return CommitMessage;
    }
};

int main(int argc, char* argv[]) {
    std::cout << "=======Before consuming messages=======" << std::endl;
    ONSFactoryProperty factoryInfo;
    // 设置为您在消息队列RocketMQ版控制台创建的Group ID。从实例化的版本开始,ProducerId和CounsumerId已经统一,此处设置是为了接口保持向前兼容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "GID_XXX");
    /**
    * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
    */
    // 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
    // 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

    PushConsumer *consumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
    const char* topic_1 = "topic-1";
    // 订阅topic-1中Tag消息属性为tag-1的所有消息。
    const char* tag_1 = "tag-1";

    const char* topic_2 = "topic-2";
    // 订阅topic-2的所有消息。
    const char* tag_2 = "*";


    // 请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。
    ExampleMessageListener * message_listener = new ExampleMessageListener();
    consumer->subscribe(topic_1, tag_1, message_listener);
    consumer->subscribe(topic_2, tag_2, message_listener);

    // 准备工作完成,必须调用启动函数,才可以正常工作。
    consumer->start();

    // 请保持线程常驻,不要执行shutdown操作。
    std::this_thread::sleep_for(std::chrono::milliseconds(60 * 1000));
    consumer->shutdown();
    delete message_listener;
    std::cout << "=======After consuming messages======" << std::endl;
    return 0;
}

顺序消息收发示例

发送顺序消息

#include "ONSFactory.h"
#include "ONSClientException.h"
#include <iostream>
using namespace ons;

int main()
{
    // Producer创建和正常工作的参数,必须输入。
    ONSFactoryProperty factoryInfo;
    // 设置为您在消息队列RocketMQ版控制台创建的Group ID。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
    // 您在消息队列RocketMQ版控制台创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // 消息内容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    /**
    * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
    */  
    // 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
    
    // 创建Producer。
    OrderProducer *pProducer = ONSFactory::getInstance()->createOrderProducer(factoryInfo);


    // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
    pProducer->start();

    Message msg(
                // Message Topic。
                factoryInfo.getPublishTopics(),
                // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。
                "TagA",
                // Message Body,任何二进制形式的数据,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                factoryInfo.getMessageContent()
    );

    // 设置代表消息的业务属性,请尽可能全局唯一。
    // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
    // 注意:不设置也不会影响消息正常收发。
    msg.setKey("ORDERID_100");
    // 分区顺序消息中区分不同分区的关键字段。
    // 如果是全局顺序消息,该字段可以设置为任意非空字符串。
    std::string shardingKey = "abc";  
    // 带有同一Sharding Key的消息会按照顺序发送。
    try
    {
        // 发送消息,只要不抛异常就是成功。
        SendResultONS sendResult = pProducer->send(msg, shardingKey);
    std::cout << "send success" << std::endl;
    }
    catch(ONSClientException & e)
    {
        // 添加对exception的处理。
    }
    // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
    pProducer->shutdown();

    return 0;
}           

订阅顺序消息

#include "ONSFactory.h"
using namespace std;
using namespace ons;

// 创建消费消息的实例。
// pushConsumer拉取到消息后,会主动调用该实例的consumeMessage函数。
class ONSCLIENT_API MyMsgListener : public MessageOrderListener
{
public:
    MyMsgListener()
    {
    }
    virtual ~MyMsgListener()
    {
    }

    virtual OrderAction consume(Message &message, ConsumeOrderContext &context)
    {
        // 根据业务需求,消费消息。
        return Success; //CONSUME_SUCCESS;
    }
};


int main(int argc, char* argv[])
{
    // OrderConsumer创建和工作需要的参数,必须输入。
    ONSFactoryProperty factoryInfo;
    // 设置为您在消息队列RocketMQ版控制台创建的Group ID。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");
    // 设置为您在消息队列RocketMQ版控制台上创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    /**
    * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
    */  
    // 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
    // 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

    // 创建orderConsumer。
    OrderConsumer* orderConsumer = ONSFactory::getInstance()->createOrderConsumer(factoryInfo);
    MyMsgListener  msglistener;
    // 指定orderConsumer订阅的消息Topic和消息Tag。
    orderConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    // 注册消息监听的处理实例,orderConsumer拉取到消息后,会调用该类的consumeMessage函数。

    // 启动orderConsumer。
    orderConsumer->start();

    for(volatile int i = 0; i < 10; ++i) {
        // wait
    }

    // 销毁orderConsumer, 在应用退出前,必须销毁Consumer对象,否则会导致内存泄露等问题。
    orderConsumer->shutdown();

   return 0;
}        

定时/延时消息收发示例

发送定时/延时消息

#include "ONSFactory.h"
#include "ONSClientException.h"

#include <windows.h>
using namespace ons;
int main()
{

    // 创建Producer,并配置发送消息所必需的信息。
    ONSFactoryProperty factoryInfo;
    // 设置为您在消息队列RocketMQ版控制台创建的Group ID。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
    // 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
    // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
    // 您在消息队列RocketMQ版控制台创建的Topic。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
    // 消息内容。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
    /**
    * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
    * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
    * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
    * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
    */  
    // 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
    // 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。

    // 创建Producer。
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
    pProducer->start();

    Message msg(
            // Message Topic。
            factoryInfo.getPublishTopics(),
            // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤       
            "TagA",
            // Message Body,不能为空,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
            factoryInfo.getMessageContent()
    );

    // 设置代表消息的业务关键属性,请尽可能全局唯一。
    // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
    // 注意:不设置也不会影响消息正常收发。
    msg.setKey("ORDERID_100");

    // deliver time单位ms,指定一个时刻,在这个时刻之后消息才能被消费,这个例子表示3s后才能被消费。
    long deliverTime = GetTickCount64() + 3000;
    msg.setStartDeliverTime(deliverTime);

    // 发送消息,只要不抛出异常,就代表发送成功。     
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // 自定义处理exception的细节。
    }

    // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
    pProducer->shutdown();

    return 0;
}
            

订阅定时/延时消息

订阅定时/延时消息的示例代码和订阅普通消息一样,请参见订阅普通消息

事务消息收发示例

发送事务消息

  1. 发送半事务消息(Half Message)及执行本地事务,示例代码如下。

    #include "ONSFactory.h"
    #include "ONSClientException.h"
    using namespace ons;
    
        class MyLocalTransactionExecuter : LocalTransactionExecuter
        {
            MyLocalTransactionExecuter()
            {
            }
    
            ~MyLocalTransactionExecuter()
            {
            }
            virtual TransactionStatus execute(Message &value)
            {
                    // 消息ID(有可能消息体一样,但消息ID不一样,当前消息ID在消息队列RocketMQ版控制台无法查询。)
                    string msgId = value.getMsgID();
                    // 消息体内容进行crc32,也可以使用其它的如MD5。
                    // 消息ID和crc32id主要是用来防止消息重复。
                    // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。
                    // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。
                    TransactionStatus transactionStatus = Unknow;
                    try {
                        boolean isCommit = 本地事务执行结果;
                        if (isCommit) {
                            // 本地事务成功、提交消息。
                            transactionStatus = CommitTransaction;
                        } else {
                            // 本地事务失败、回滚消息。
                            transactionStatus = RollbackTransaction;
                        }
                    } catch (...) {
                        //exception handle
                    }
                    return transactionStatus;
            }
        }
    
        int main(int argc, char* argv[])
        {
            //创建Producer和发送消息所必需的信息。
            ONSFactoryProperty factoryInfo;
            // 设置为您在消息队列RocketMQ版控制台创建的Group ID。 
            factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");
            // 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); 
            // 您在消息队列RocketMQ版控制台创建的Topic。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );
            // 消息内容。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");
            /**
            * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
            * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
            * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
            * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
            */  
            // 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
            // 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
    
            // 创建producer,消息队列RocketMQ版不负责pChecker的释放,需要业务方自行释放资源。
            MyLocalTransactionChecker *pChecker = new MyLocalTransactionChecker();
            g_producer = ONSFactory::getInstance()->createTransactionProducer(factoryInfo,pChecker);
    
            // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
            pProducer->start();
    
            Message msg(
                // Message Topic。
                factoryInfo.getPublishTopics(),
                // Message Tag,可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列RocketMQ版的服务器过滤。       
                "TagA",
                // Message Body,不能为空,消息队列RocketMQ版不做任何干预,需要Producer与Consumer协商好一致的序列化和反序列化方式。
                factoryInfo.getMessageContent()
            );
    
            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
            // 注意:不设置也不会影响消息正常收发。
            msg.setKey("ORDERID_100");
    
            // 发送消息,只要不抛出异常,就代表发送成功。   
            try
            {
                //消息队列RocketMQ版不负责pExecuter的释放,需要业务方自行释放资源。
                MyLocalTransactionExecuter pExecuter = new MyLocalTransactionExecuter();
                SendResultONS sendResult = pProducer->send(msg,pExecuter);
            }
            catch(ONSClientException & e)
            {
                // 自定义处理exception的细节。
            }
            // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
            pProducer->shutdown();
    
            return 0;
    
        }        
  2. 提交事务消息状态,示例代码如下。

     class MyLocalTransactionChecker : LocalTransactionChecker
     {
         MyLocalTransactionChecker()
         {
         }
    
         ~MyLocalTransactionChecker()
         {
         }
    
         virtual TransactionStatus check(Message &value)
         {
             // 消息ID(有可能消息体一样,但消息ID不一样,当前消息ID在消息队列RocketMQ版控制台无法查询。)
             string msgId = value.getMsgID();
             // 消息体内容进行crc32,也可以使用其它的如MD5。
             // 消息ID和crc32id主要是用来防止消息重复。
             // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。
             // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。
             TransactionStatus transactionStatus = Unknow;
             try {
                 boolean isCommit = 本地事务执行结果;
                 if (isCommit) {
                     // 本地事务成功、提交消息。
                     transactionStatus = CommitTransaction;
                 } else {
                     // 本地事务失败、回滚消息。
                     transactionStatus = RollbackTransaction;
                 }
             } catch(...) {
                 //exception error
             }
             return transactionStatus;
         }
     }               

订阅事务消息

订阅事务消息的示例代码和订阅普通消息一样,请参见订阅普通消息