全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息队列 MQ

订阅消息

更新时间:2017-12-15 20:40:00

本文介绍如何通过 MQ SDK 使用.NET 语言进行消息订阅。

说明:

订阅方式

MQ 支持以下两种订阅方式:

  • 集群订阅:同一个 Consumer ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

     // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
     factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
    
  • 广播订阅:同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。

     // 广播订阅方式设置
     factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
    

示例代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;

namespace ons
{

    //pushConsumer 拉取到消息后,会主动调用该实例的 consumer 函数
    public class MyMsgListener : MessageListener
    {    
        public MyMsgListener()
        {
        }

        ~MyMsgListener()
        {
        }

        public override  Action  consume(Message value, ConsumeContext context)
        {
           // Message 包含了消费到的消息,通过 getBody 接口可以拿到消息体
           Console.WriteLine(value.getBody());
            /*
                   所有中文编码相关问题都在 SDK 压缩包包含的文档里做了说明,请仔细阅读
           */
            return ons.Action.CommitMessage;
        }
    }

    class onscsharp
    {
           static void Main(string[] args)
        {
            //pushConsumer 创建和工作需要的参数,必须输入
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            factoryInfo.setFactoryProperty(factoryInfo.ConsumerId, "XXX");//您在 MQ 控制台申请的 Consumer ID
            factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "XXX");//您在 MQ 控制台申请的 Topic
            factoryInfo.setFactoryProperty(factoryInfo.AccessKey,"xx");//AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
            factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "xxxx");//SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
            // 集群订阅方式 (默认)
            // factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
            // 广播订阅方式
            // factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);

            //创建 consumer 用于消费消息
            ONSFactory onsfactory = new ONSFactory();
            PushConsumer pConsumer = onsfactory.getInstance().createPushConsumer(factoryInfo);

            //给 consumer 注册消费回调,一旦有消息就会触发回调函数
            MessageListener msgListener = new MyMsgListener();
            pConsumer.subscribe(factoryInfo.getPublishTopics(), "*",  msgListener);

            //启动 consumer
            pConsumer.start();   
            //consumer 启动后,会异步拉取消息,拉取到消息后,回调 MyMsgListener 实例的 consumer 函数,将消息体通过参数传递给 consumer     

            // 这里可以继续做业务相关的逻辑处理,确定消费完成后,调用 shutdown 函数,释放资源。应用退出的时候也需要调用 shutdown 函数。
            pConsumer.shutdown();
        }
    }
}
本文导读目录