全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 智能硬件
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 更多
消息队列 MQ

订阅消息

更新时间:2018-06-19 15:39:57

本文介绍如何通过 MQ SDK 使用.NET 语言进行消息订阅。

说明:

订阅方式

MQ 支持以下两种订阅方式:

  • 集群订阅:同一个 Consumer ID 所标识的所有 Consumer 平均分摊消费消息。 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

    1. // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
  • 广播订阅:同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。 例如某个 Topic 有 9 条消息,一个 Consumer ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。

    1. // 广播订阅方式设置
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);

示例代码

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Runtime.InteropServices;
  6. using ons;
  7. namespace ons
  8. {
  9. //pushConsumer 拉取到消息后,会主动调用该实例的 consumer 函数
  10. public class MyMsgListener : MessageListener
  11. {
  12. public MyMsgListener()
  13. {
  14. }
  15. ~MyMsgListener()
  16. {
  17. }
  18. public override Action consume(Message value, ConsumeContext context)
  19. {
  20. // Message 包含了消费到的消息,通过 getBody 接口可以拿到消息体
  21. Console.WriteLine(value.getBody());
  22. /*
  23. 所有中文编码相关问题都在 SDK 压缩包包含的文档里做了说明,请仔细阅读
  24. */
  25. return ons.Action.CommitMessage;
  26. }
  27. }
  28. class onscsharp
  29. {
  30. static void Main(string[] args)
  31. {
  32. //pushConsumer 创建和工作需要的参数,必须输入
  33. ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
  34. factoryInfo.setFactoryProperty(factoryInfo.ConsumerId, "XXX");//您在 MQ 控制台创建的 Consumer ID
  35. factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "XXX");//您在 MQ 控制台创建的 Topic
  36. factoryInfo.setFactoryProperty(factoryInfo.AccessKey,"xx");//AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  37. factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "xxxx");//SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  38. // 集群订阅方式 (默认)
  39. // factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
  40. // 广播订阅方式
  41. // factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
  42. //创建 consumer 用于消费消息
  43. ONSFactory onsfactory = new ONSFactory();
  44. PushConsumer pConsumer = onsfactory.getInstance().createPushConsumer(factoryInfo);
  45. //给 consumer 注册消费回调,一旦有消息就会触发回调函数
  46. MessageListener msgListener = new MyMsgListener();
  47. pConsumer.subscribe(factoryInfo.getPublishTopics(), "*", msgListener);
  48. //启动 consumer
  49. pConsumer.start();
  50. //consumer 启动后,会异步拉取消息。拉取到消息后,回调 MyMsgListener 实例的 consumer 函数,将消息体通过参数传递给 consumer
  51. // 这里可以继续做业务相关的逻辑处理,确定消费完成后,调用 shutdown 函数,释放资源。 应用退出的时候也需要调用 shutdown 函数。
  52. pConsumer.shutdown();
  53. }
  54. }
  55. }
本文导读目录