全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
消息队列 MQ

集群方式订阅消息

更新时间:2017-08-24 17:13:15

集群订阅即某个消费者集群只消费指定的 Topic,而不是消费所有 Topic。

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Runtime.InteropServices;
  6. using ons;
  7. namespace ons
  8. {
  9. //pushConsumer拉取到消息后,会主动调用该实例的consumer函数
  10. public class MyMsgListener : MessageListener
  11. {
  12. public MyMsgListener()
  13. {
  14. }
  15. ~MyMsgListener()
  16. {
  17. }
  18. public override Action consume(Message value, ConsumeContext context)
  19. {
  20. /*
  21. 所有中文编码相关问题都在SDK压缩包包含的文档里做了说明,请仔细阅读
  22. */
  23. return ons.Action.CommitMessage;
  24. }
  25. }
  26. class onscsharp
  27. {
  28. static void Main(string[] args)
  29. {
  30. //pushConsumer创建和工作需要的参数,必须输入
  31. ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
  32. factoryInfo.setFactoryProperty(factoryInfo.ConsumerId, "XXX");//您在MQ控制台申请的Consumer ID
  33. factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "XXX");//您在MQ控制台申请的Topic
  34. factoryInfo.setFactoryProperty(factoryInfo.AccessKey,"xx");//AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  35. factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "xxxx");//SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  36. //create consumer
  37. ONSFactory onsfactory = new ONSFactory();
  38. PushConsumer pConsumer = onsfactory.getInstance().createPushConsumer(factoryInfo);
  39. //register msg listener and subscribe msg topic
  40. MessageListener msgListener = new MyMsgListener();
  41. pConsumer.subscribe(factoryInfo.getPublishTopics(), "*", msgListener);
  42. //start consumer
  43. pConsumer.start();
  44. //consumer启动后,会自动拉取消息,拉取到消息后,会自动调用MyMsgListener实例的consume函数;
  45. //确定消费完成后,调用shutdown函数;在应用退出前,必须销毁Consumer 对象,否则会导致内存泄露等问题
  46. pConsumer.shutdown();
  47. }
  48. }
  49. }
本文导读目录