示例代码
更新时间:
云消息队列 RocketMQ 版5.x版本实例可兼容.NET 1.x/2.x SDK客户端接入,您可以使用1.x/2.x版本的SDK接入5.x实例进行消息收发。本文为您介绍1.x/2.x版本下的.NET SDK消息收发示例代码。
重要
- 推荐您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作为主力研发版本,和云消息队列 RocketMQ 版5.x服务端完全兼容,提供了更全面的功能并支持更多增强特性。更多信息,请参见5.x系列SDK。
- RocketMQ 4.x/3.x系列SDK和ONS系列SDK后续仅做功能维护,建议仅存量业务使用。
普通消息收发示例
发送普通消息
using System;
using ons;
public class ProducerExampleForEx
{
public ProducerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// 您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 设置日志路径。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 创建生产者实例。
// 说明:生产者实例是线程安全的,可用于发送不同Topic的消息。基本上,您每一个线程只需要一个生产者实例。
Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);
// 启动客户端实例。
producer.start();
// 创建消息对象。
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
msg.setKey(Guid.NewGuid().ToString());
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// 在您的线程即将退出时,关闭生产者实例。
producer.shutdown();
}
}
订阅普通消息
using System;
using System.Threading;
using System.Text;
using ons;
// 从Broker拉取消息时要执行的回调函数。
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// 您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// 设置日志路径。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 集群消费。
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// 广播消费。
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// 创建消费者实例。
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// 订阅Topic。
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// 启动客户端实例。
consumer.start();
// 该设置仅供Demo使用,实际生产环境中请保证进程不退出。
Thread.Sleep(300000);
// 在进程即将退出时,关闭消费者实例。
consumer.shutdown();
}
}
顺序消息收发示例
发送顺序消息
using System;
using ons;
public class OrderProducerExampleForEx
{
public OrderProducerExampleForEx()
{
}
static void Main(string[] args) {
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// 您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 设置日志路径。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 创建生产者实例。
// 说明:生产者实例是线程安全的,可用于发送不同Topic的消息。基本上,您每一个线程只需要一个生产者实例。
OrderProducer producer = ONSFactory.getInstance().createOrderProducer(factoryInfo);
// 启动客户端实例。
producer.start();
// 创建消息对象。
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
string shardingKey = "App-Test";
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg, shardingKey);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// 在您的线程即将退出时,关闭生产者实例。
producer.shutdown();
}
}
订阅顺序消息
using System;
using System.Text;
using System.Threading;
using ons;
namespace demo
{
public class MyMsgOrderListener : MessageOrderListener
{
public MyMsgOrderListener()
{
}
~MyMsgOrderListener()
{
}
public override ons.OrderAction consume(Message value, ConsumeOrderContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.OrderAction.Success;
}
}
class OrderConsumerExampleForEx
{
static void Main(string[] args)
{
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// 您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 设置日志路径。
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 创建消费者实例。
OrderConsumer consumer = ONSFactory.getInstance().createOrderConsumer(factoryInfo);
// 订阅Topic。
consumer.subscribe(factoryInfo.getPublishTopics(), "*",new MyMsgOrderListener());
// 启动消费者实例。
consumer.start();
// 让主线程睡眠一段时间。
Thread.Sleep(30000);
// 不再使用时,关闭消费者实例。
consumer.shutdown();
}
}
}
定时/延时消息收发示例
发送定时/延时消息
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;
namespace ons
{
class onscsharp
{
static void Main(string[] args)
{
// Producer创建和正常工作的参数,必须输入。
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// 设置为您在消息队列RocketMQ版控制台创建的Group ID。
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "XXX ");
// 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT");
// 您在消息队列RocketMQ版控制台创建的Topic。
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "XXX");
//消息内容。
factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "XXX");
/**
* 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。
* 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。
* 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。
* 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。
*/
// 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME");
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" );
// 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。
// 创建Producer。
Producer pProducer = ONSFactory.getInstance().createProducer(factoryInfo);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
pProducer.start();
Message msg = new Message(
// 消息主题。
factoryInfo.getPublishTopics(),
// 消息标签。
"TagA",
// 消息主体。
factoryInfo.getMessageContent()
);
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发。
msg.setKey("ORDERID_100");
// deliver time,单位:ms。指定一个时刻,在这个时刻之后消息才能被消费,这个例子表示3s后才能被消费。
long deliverTime = System.currentTimeMillis() + 3000;
msg.setStartDeliverTime(deliverTime);
// 发送消息,只要不抛出异常,就代表发送成功。
try
{
SendResultONS sendResult = pProducer.send(msg);
}
catch(ONSClientException e)
{
// 发送失败处理。
}
// 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
pProducer.shutdown();
}
}
}
订阅定时/延时消息
订阅定时/延时消息的示例代码和订阅普通消息一样,请参见订阅普通消息。
事务消息收发示例
发送事务消息
发送半事务消息(Half Message)及执行本地事务,示例代码如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Runtime.InteropServices; using ons; namespace ons { public class MyLocalTransactionExecuter : LocalTransactionExecuter { public MyLocalTransactionExecuter() { } ~MyLocalTransactionExecuter() { } public override TransactionStatus execute(Message value) { Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}", value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser")); // 消息ID(有可能消息体一样,但消息ID不一样。当前消息ID在消息队列RocketMQ版控制台无法查询)。 string msgId = value.getMsgID(); // 消息体内容进行crc32, 也可以使用其它的如MD5。 // 消息ID和crc32id主要是用来防止消息重复。 // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。 TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = 本地事务执行结果; if (isCommit) { // 本地事务成功则提交消息。 transactionStatus = TransactionStatus.CommitTransaction; } else { // 本地事务失败则回滚消息。 transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { // 处理异常。 } return transactionStatus; } } class onscsharp { static void Main(string[] args) { ONSFactoryProperty factoryInfo = new ONSFactoryProperty(); // 设置为您从消息队列RocketMQ版控制台获取的接入点信息,类似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。 // 注意!!!直接填写控制台提供的域名和端口即可,请勿添加http://或https://前缀标识,也不要用IP解析地址。 factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "ACCESS POINT"); // 您在消息队列RocketMQ版控制台创建的Topic。 factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, ""); // 消息内容。 factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, ""); /** * 如果是使用公网接入点访问,则必须设置AccessKey和SecretKey,里面填写实例的用户名和密码。实例用户名和密码在控制台访问控制的智能身份识别页签中获取。 * 注意!!!这里填写的不是阿里云账号的AccessKey ID和AccessKey Secret,请务必区分开。 * 如果是在阿里云ECS内网访问,则无需配置,服务端会根据内网VPC信息智能获取。 * 如果实例类型为Serverless实例,公网访问必须设置实例的用户名密码,当开启内网免身份识别时,内网访问可以不设置用户名和密码。 */ // 实例用户名和密码在消息队列RocketMQ版控制台访问控制的智能身份识别页签中获取。 factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "INSTANCE USER NAME"); factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "INSTANCE PASSWORD" ); // 注意!!!访问RocketMQ 5.x实例时,InstanceID属性不需要设置,否则会导致失败。 // 创建事务生产者。 LocalTransactionChecker myChecker = new MyLocalTransactionChecker(); TransactionProducer pProducer =ONSFactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker); // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可,启动之后可以多线程并发发送消息。 pProducer.start(); Message msg = new Message( //Message Topic factoryInfo.getPublishTopics(), //Message Tag "TagA", //Message Body factoryInfo.getMessageContent() ); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通消息队列RocketMQ版过控制台查询消息并补发。 // 注意:不设置也不会影响消息正常收发。 msg.setKey("ORDERID_100"); // 发送消息,只要不抛出异常,就代表发送成功。 try { LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter(); SendResultONS sendResult = pProducer.send(msg, ref myExecuter); } catch(ONSClientException e) { Console.WriteLine("\nexception of sendmsg:{0}",e.what() ); } // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。 // shutdown之后不能重新start此Producer。 pProducer.shutdown(); } } }
提交事务消息状态,示例代码如下。
public class MyLocalTransactionChecker : LocalTransactionChecker { public MyLocalTransactionChecker() { } ~MyLocalTransactionChecker() { } public override TransactionStatus check(Message value) { Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}", value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser")); // 消息ID(有可能消息体一样,但消息ID不一样。当前消息ID在消息队列RocketMQ版控制台无法查询)。 string msgId = value.getMsgID(); // 消息体内容进行crc32,也可以使用其它的如MD5。 // 消息ID和crc32id主要是用来防止消息重复。 // 如果业务本身是幂等的,可以忽略,否则需要利用msgId或crc32Id来做幂等。 // 如果要求消息绝对不重复,推荐做法是对消息体body使用crc32或MD5来防止重复消息。 TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = 本地事务执行结果; if (isCommit) { // 本地事务成功、提交消息。 transactionStatus = TransactionStatus.CommitTransaction; } else { // 本地事务失败、回滚消息。 transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { //exception handle } return transactionStatus; } }
订阅事务消息
订阅事务消息的示例代码和订阅普通消息一样,请参见订阅普通消息。
文档内容是否对您有帮助?