顺序消息(FIFO 消息)是云消息队列 RocketMQ 版提供的一种严格按照顺序来发布和消费的消息类型。本文提供使用TCP协议下的.NET SDK收发顺序消息的示例代码。
顺序消息分类
顺序消息分为两类:
全局顺序:对于指定的一个Topic,所有消息按照严格的先入先出FIFO(First In First Out)的顺序进行发布和消费。
分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Key是完全不同的概念。
更多信息,请参见顺序消息。
前提条件
下载.NET SDK。更多信息,请参见版本说明。
环境准备。更多信息,请参见环境准备。
创建资源。代码中涉及的资源信息,例如实例、Topic和Group ID等,需要在控制台上提前创建。更多信息,请参见创建资源。
获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey。
发送顺序消息
重要
云消息队列 RocketMQ 版服务端判定消息产生的顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果发送方有多个生产者或者有多个线程并发发送消息,则此时只能以到达云消息队列 RocketMQ 版服务端的时序作为消息顺序的依据,和业务侧的发送顺序未必一致。
具体的示例代码,请以云消息队列 RocketMQ 版代码库为准。
发送顺序消息的示例代码如下。
using System;
using ons;
public class OrderProducerExampleForEx
{
public OrderProducerExampleForEx()
{
}
static void Main(string[] args) {
// 配置您的账号,以下设置均可从控制台获取。
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
//请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
//AccessKey ID,阿里云身份验证标识。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
//AccessKey Secret,阿里云身份验证密钥。
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// 您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// 您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// 设置日志路径。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 创建生产者实例。
// 说明:生产者实例是线程安全的,可用于发送不同Topic的消息。基本上,您每一个线程只需要一个生产者实例。
OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);
// 启动客户端实例。
producer.start();
// 创建消息对象。
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
string shardingKey = "App-Test";
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg, shardingKey);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// 在您的线程即将退出时,关闭生产者实例。
producer.shutdown();
}
}
订阅顺序消息
订阅顺序消息的示例代码如下。
using System;
using System.Text;
using System.Threading;
using ons;
namespace demo
{
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.OrderAction.Success;
}
}
class OrderConsumerExampleForEx
{
static void Main(string[] args)
{
// 配置您的账号,以下设置均可从控制台获取。
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// AccessKey ID,阿里云身份验证标识。
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// AccessKey Secret,阿里云身份验证密钥。
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// 您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// 您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// 设置日志路径。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 创建消费者实例。
OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);
// 订阅Topic。
consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());
// 启动消费者实例。
consumer.start();
// 让主线程睡眠一段时间。
Thread.Sleep(30000);
// 不再使用时,关闭消费者实例。
consumer.shutdown();
}
}
}
文档内容是否对您有帮助?