顺序消息(FIFO 消息)是消息队列 RocketMQ 版提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用 TCP 协议下的 .NET SDK 收发顺序消息的示例代码供您参考。

顺序消息分为两类:

  • 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(First In First Out,简称 FIFO)的顺序进行发布和消费。
  • 分区顺序:对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding Key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

详情请参见顺序消息

前提条件

发送顺序消息

说明 具体的示例代码,请以消息队列 RocketMQ 版代码库为准。

发送顺序消息的示例代码如下:

using System;
using ons;

public class OrderProducerExampleForEx
{
    public OrderProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // 配置您的账号,以下设置均可从控制台获取
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // 您在控制台创建的 Group ID
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // 您在控制台创建的 Topic
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // 设置日志路径
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // 创建生产者实例
        // 说明:生产者实例是线程安全的,可用于发送不同 Topic 的消息。基本上,您每一个线程
        // 只需要一个生产者实例
        OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);

        // 启动客户端实例
        producer.start();

        // 创建消息对象
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        string shardingKey = "App-Test";
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg, shardingKey);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // 在您的线程即将退出时,关闭生产者实例
        producer.shutdown();

    }
}

订阅顺序消息

订阅顺序消息的示例代码如下:

using System;
using System.Text;
using System.Threading;
using ons;

namespace demo
{

    public class MyMsgOrderListener : MessageOrderListener
    {
        public MyMsgOrderListener()
        {

        }

        ~MyMsgOrderListener()
        {
        }

        public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
        {
            Byte[] text = Encoding.Default.GetBytes(value.getBody());
            Console.WriteLine(Encoding.UTF8.GetString(text));
            return ons.OrderAction.Success;
        }
    }

    class OrderConsumerExampleForEx
    {
        static void Main(string[] args)
        {
            // 配置您的账号,以下设置均可从控制台获取
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // AccessKeyId 阿里云身份验证,在阿里云服务器管理控制台创建
            factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
            // AccessKeySecret 阿里云身份验证,在阿里云服务器管理控制台创建
            factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
            // 您在控制台创建的 Group ID
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
            // 您在控制台创建的 Topic
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
            // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
            // 设置日志路径
            factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

            // 创建生产者实例
            OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);

            // 订阅 Topic
            consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());

            // 启动消费者实例
            consumer.start();

            // 让主线程睡眠一段时间
            Thread.Sleep(30000);

            // 不再使用时,关闭消费者实例
            consumer.shutdown();
        }
    }
}