全部产品
阿里云办公

步骤四:订阅消息

更新时间:2018-08-28 16:32:31

消息发送成功后,需要启动订阅方进行消息订阅。本文以 TCP Java SDK 为例,介绍如何通过调用相关协议及开发语言的 SDK/API 来完成消息订阅。

说明:关于 TCP 接入点域名,请参见TCP 接入说明

调用 TCP Java SDK 订阅消息

您可以运行以下示例代码来启动订阅端,并测试订阅消息的功能。请按照说明正确设置相关参数。目前控制台提供了 Java,C++, .NET的示例代码。

  1. import com.aliyun.openservices.ons.api.Action;
  2. import com.aliyun.openservices.ons.api.ConsumeContext;
  3. import com.aliyun.openservices.ons.api.Consumer;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public class ConsumerTest {
  10. public static void main(String[] args) {
  11. Properties properties = new Properties();
  12. // 您在 MQ 控制台创建的 Consumer ID
  13. properties.put(PropertyKeyConst.ConsumerId, "XXX");
  14. // 鉴权用 AccessKey,在阿里云服务器管理控制台创建
  15. properties.put(PropertyKeyConst.AccessKey, "XXX");
  16. // 鉴权用 SecretKey,在阿里云服务器管理控制台创建
  17. properties.put(PropertyKeyConst.SecretKey, "XXX");
  18. // 设置 TCP 接入域名(此处以公共云公网环境接入为例)
  19. properties.put(PropertyKeyConst.ONSAddr,
  20. "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
  21. Consumer consumer = ONSFactory.createConsumer(properties);
  22. consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
  23. public Action consume(Message message, ConsumeContext context) {
  24. System.out.println("Receive: " + message);
  25. return Action.CommitMessage;
  26. }
  27. });
  28. consumer.start();
  29. System.out.println("Consumer Started");
  30. }
  31. }

查看消息订阅是否成功

完成上述步骤后,您可以在 MQ 控制台查看订阅端是否启动成功,即消息订阅是否成功。目前控制台查看消费者状态仅支持 TCP 客户端,暂不支持 HTTP 以及 MQTT 客户端。

  1. 在 MQ 控制台左侧菜单栏单击消费者管理

  2. 找到要查看的 Topic,单击右侧操作选项里的消费者状态
    如果是否在线显示为,则说明订阅端已成功启动。如果消费者状态是否在线显示为,说明消费端没有启动或者启动失败。

完成以上所有步骤后,您就成功接入了 MQ 服务,可以用 MQ 进行消息发送和订阅了。