本文介绍如何通过消息队列RocketMQ版的SDK使用.NET语言进行消息订阅。

说明 请确保同一个Group ID下所有Consumer实例的订阅关系保持一致。更多信息,请参见订阅关系一致

消息队列RocketMQ版支持以下两种订阅方式:

  • 集群订阅

    同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。

        // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);              
  • 广播订阅

    同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。

        // 广播订阅方式设置
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);                

示例代码

using System;
using System.Threading;
using System.Text;
using ons;

// 从Broker拉取消息时要执行的回调函数。
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // 配置您的账号,以下设置均可从控制台获取。
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // AccessKey阿里云身份验证,在阿里云服务器管理控制台创建。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // SecretKey阿里云身份验证,在阿里云服务器管理控制台创建。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // 您在控制台创建的Group ID。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // 您在控制台创建的Topic。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // 设置TCP接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // 设置日志路径。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // 集群消费。
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // 广播消费。
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // 创建消费者实例。
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // 订阅Topics。
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // 启动客户端实例。
        consumer.start();

        //该设置仅供demo使用,实际生产中请保证进程不退出。
        Thread.Sleep(300000);

        // 在进程即将退出时,关闭消费者实例。
        consumer.shutdown();
    }
}