文档

收发顺序消息

更新时间:

顺序消息(FIFO 消息)是云消息队列 RocketMQ 版提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用TCP协议下的.NET SDK收发顺序消息的示例代码。

顺序消息分类

顺序消息分为两类:

  • 全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。

  • 分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。

更多信息,请参见顺序消息

前提条件

  • 下载.NET SDK。更多信息,请参见版本说明

  • 环境准备。更多信息,请参见环境准备

  • 创建资源。代码中涉及的资源信息,例如实例、Topic和Group ID等,需要在控制台上提前创建。更多信息,请参见创建资源

  • 获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey

发送顺序消息

重要

云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致。

具体的示例代码,请以云消息队列 RocketMQ 版代码库为准。

发送顺序消息的示例代码如下。

using System;
using ons;

public class OrderProducerExampleForEx
{
    public OrderProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // 配置您的账号,以下设置均可从控制台获取。
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
        //AccessKey ID,阿里云身份验证标识。
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	      //AccessKey Secret,阿里云身份验证密钥。
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // 您在消息队列RocketMQ版控制台创建的Group ID。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // 您在消息队列RocketMQ版控制台创建的Topic。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // 设置日志路径。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // 创建生产者实例。
        // 说明:生产者实例是线程安全的,可用于发送不同Topic的消息。基本上,您每一个线程只需要一个生产者实例。
        OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);

        // 启动客户端实例。
        producer.start();

        // 创建消息对象。
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        string shardingKey = "App-Test";
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg, shardingKey);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // 在您的线程即将退出时,关闭生产者实例。
        producer.shutdown();

    }
}

订阅顺序消息

订阅顺序消息的示例代码如下。

using System;
using System.Text;
using System.Threading;
using ons;

namespace demo
{

    public class MyMsgOrderListener : MessageOrderListener
    {
        public MyMsgOrderListener()
        {

        }

        ~MyMsgOrderListener()
        {
        }

        public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
        {
            Byte[] text = Encoding.Default.GetBytes(value.getBody());
            Console.WriteLine(Encoding.UTF8.GetString(text));
            return ons.OrderAction.Success;
        }
    }

    class OrderConsumerExampleForEx
    {
        static void Main(string[] args)
        {
            // 配置您的账号,以下设置均可从控制台获取。
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // AccessKey ID,阿里云身份验证标识。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
            // AccessKey Secret,阿里云身份验证密钥。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
            // 您在消息队列RocketMQ版控制台创建的Group ID。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
            // 您在消息队列RocketMQ版控制台创建的Topic。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
            // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
            // 设置日志路径。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

            // 创建消费者实例。
            OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);

            // 订阅Topic。
            consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());

            // 启动消费者实例。
            consumer.start();

            // 让主线程睡眠一段时间。
            Thread.Sleep(30000);

            // 不再使用时,关闭消费者实例。
            consumer.shutdown();
        }
    }
}