本文介绍如何通过消息队列 RocketMQ 版的 SDK 使用 .NET 语言进行消息订阅。

说明 请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致,详见订阅关系一致

消息队列 RocketMQ 版支持以下两种订阅方式:

  • 集群订阅

    同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

        // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);              
  • 广播订阅

    同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。

        // 广播订阅方式设置
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);                

示例代码

using System;
using System.Threading;
using System.Text;
using ons;

// 从 Broker 拉取消息时要执行的回调函数
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // 配置您的账号,以下设置均可从控制台获取
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // 您在控制台创建的 Group ID
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // 您在控制台创建的 Topic
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // 设置日志路径
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // 集群消费
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // 广播消费
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // 创建消费者实例
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // 订阅 Topics
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // 启动客户端实例
        consumer.start();

        //该设置仅供 demo 使用,实际生产中请保证进程不退出
        Thread.Sleep(300000);

        // 在进程即将退出时,关闭消费者实例
        consumer.shutdown();
    }
}