全部产品
云市场

订阅消息

更新时间:2019-09-13 21:59:54

本文介绍如何通过消息队列 MQ 的 SDK 使用.NET 语言进行消息订阅。

说明:请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致,详见订阅关系一致

订阅方式

消息队列 MQ 支持以下两种订阅方式:

  • 集群订阅:同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。

    1. // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
  • 广播订阅:同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。

    1. // 广播订阅方式设置
    2. factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);

示例代码

  1. using System;
  2. using System.Threading;
  3. using System.Text;
  4. using ons;
  5. // 从 Broker 拉取消息时要执行的回调函数
  6. public class MyMsgListener : MessageListener
  7. {
  8. public MyMsgListener()
  9. {
  10. }
  11. ~MyMsgListener()
  12. {
  13. }
  14. public override ons.Action consume(Message value, ConsumeContext context)
  15. {
  16. Byte[] text = Encoding.Default.GetBytes(value.getBody());
  17. Console.WriteLine(Encoding.UTF8.GetString(text));
  18. return ons.Action.CommitMessage;
  19. }
  20. }
  21. public class ConsumerExampleForEx
  22. {
  23. public ConsumerExampleForEx()
  24. {
  25. }
  26. static void Main(string[] args) {
  27. // 配置您的账号,以下设置均可从控制台获取
  28. ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
  29. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  30. factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
  31. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  32. factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
  33. // 您在控制台创建的 Group ID
  34. factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
  35. // 您在控制台创建的 Topic
  36. factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
  37. // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
  38. factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
  39. // 设置日志路径
  40. factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
  41. // 集群消费
  42. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
  43. // 广播消费
  44. // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
  45. // 创建消费者实例
  46. PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
  47. // 订阅 Topics
  48. consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
  49. // 启动客户端实例
  50. consumer.start();
  51. //该设置仅供 demo 使用,实际生产中请保证进程不退出
  52. Thread.Sleep(300000);
  53. // 在进程即将退出时,关闭消费者实例
  54. consumer.shutdown();
  55. }
  56. }