订阅消息

本文介绍如何通过云消息队列 RocketMQ 版的SDK使用.NET语言进行消息订阅。

说明

请确保同一个Group ID下所有Consumer实例的订阅关系保持一致。更多信息,请参见订阅关系一致

云消息队列 RocketMQ 版支持以下两种订阅方式:

  • 集群订阅

    同一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。

        // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);              
  • 广播订阅

    同一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。

        // 广播订阅方式设置
        factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);                

示例代码

using System;
using System.Threading;
using System.Text;
using ons;

// 从Broker拉取消息时要执行的回调函数。
public class MyMsgListener : MessageListener
{
    public MyMsgListener()
    {
    }

    ~MyMsgListener()
    {
    }

    public override ons.Action consume(Message value, ConsumeContext context)
    {
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        Console.WriteLine(Encoding.UTF8.GetString(text));
        return ons.Action.CommitMessage;
    }
}

public class ConsumerExampleForEx
{
    public ConsumerExampleForEx()
    {
    }

    static void Main(string[] args) {
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
        //AccessKey ID,阿里云身份验证标识。
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	      //AccessKey Secret,阿里云身份验证密钥。
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // 您在消息队列RocketMQ版控制台创建的Group ID。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // 您在消息队列RocketMQ版控制台创建的Topic。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // 设置日志路径。
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // 集群消费。
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // 广播消费。
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // 创建消费者实例。
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // 订阅Topic。
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // 启动客户端实例。
        consumer.start();

        // 该设置仅供Demo使用,实际生产环境中请保证进程不退出。
        Thread.Sleep(300000);

        // 在进程即将退出时,关闭消费者实例。
        consumer.shutdown();
    }
}