全部产品
阿里云办公

主账号 - 快速入门

更新时间:2018-12-14 16:30:45

本文主要描述主账号从开通 MQ 服务、创建 MQ 资源,到使用 MQ SDK 进行消息收发的完整流程,旨在以最简单明了的方式引导使用主账号的您快速上手 MQ,为进一步使用和熟悉 MQ 的功能提供入门。

消息收发部分以 TCP 协议下调用 Java SDK 为例来演示。

说明:若您要在 TCP 协议下使用 C/C++ 和 .NET SDK 来收发消息,请参见 C/C++ 收发普通消息.NET 收发普通消息;若要使用 MQTT 协议,请使用微消息队列概述

MQ 快速接入流程图:

quickstart_process

步骤一:开通服务

请按照以下步骤开通 MQ 服务:

  1. 登录阿里云主页,将鼠标依次移动到产品 > 互联网中间件,单击消息队列进入 MQ 产品主页。

  2. 在 MQ 产品页上,单击立即开通进入 MQ 服务开通页面,根据提示完成开通服务。

如果您已经开通 MQ 服务,请直接登录MQ 控制台

步骤二:创建资源

在 MQ 消息系统中,消息生产者将消息发送到某个指定的消息主题(Topic) ,而消息消费者则通过订阅该指定的 Topic 来获取和消费消息。

一个新的应用接入 MQ 需要先创建相关的 MQ 资源,包括:

  • 主题(Topic)
  • 生产者(Producer ID)
  • 消费者(Consumer ID)
  • 阿里云 AccessKey

注意:当您删除资源,比如 Topic、消息生产者、消息消费者的时候,相关的资源也会在 10 分钟内进行清理。

创建消息主题(Topic)

消息主题(Topic)是 MQ 里对消息进行的一级归类,比如可以创建“Topic_Trade”这一主题用来识别交易类消息。 使用 MQ 的第一步需要先为您的应用创建 Topic。

您可以按照以下步骤创建 Topic:

  1. 登录MQ 控制台

  2. 在页面上方选择地域(比如公网域),即您要将资源创建在哪个地域。

    注意:

    • 如果只是测试,或者需要在本地(非阿里云 ECS 服务器)使用 MQ 服务,请将 Topic 创建在公网环境。 生产端和消费端可以部署在本地或者部署在任意地域的 ECS 上,前提是本地服务器或者相应的 ECS 需要能够访问公网。
    • 如果在生产环境使用 MQ 服务,需要将应用程序部署在阿里云 ECS 上,同时 Topic 也需要在应用程序所在的地域(即所部署的 ECS 地域)进行创建。
    • Topic 不能跨域使用。 比如 Topic 创建在“华北 2”这个域,那么消息生产端和消费端也必须运行在“华北 2”的 ECS 上。
    • 有关地域的详细介绍请参见 ECS 文档中的地域和可用区
  3. 在左侧导航栏,选择 Topic 管理

  4. 单击创建 Topic按钮。

  5. 创建 Topic对话框中的 Topic 一栏,输入 Topic 名称。

    注意:Topic 名称必须全局唯一。 如果名称已经被其他用户使用,您将无法创建相同名称的 Topic。

  6. 消息类型一栏,选择该 Topic 对应的消息类型,即该 Topic 用来收发何种类型的消息。

    消息类型说明:

    • 普通消息:无特性的消息,区分于事务消息、定时/延时消息和顺序消息。
    • 事务消息:提供类似 X/Open XA 的分布事务功能,能达到分布式事务的最终一致。
    • 定时/延时消息:可指定消息延迟投递,即在未来的某个特定时间点或一段特定的时间后进行投递。
    • 分区顺序消息:消息根据 sharding key 进行分区,提高整体并发度与使用性能。 同一个分区的消息严格按照 FIFO 的严格顺序进行生产和消费。
    • 全局顺序消息:所有消息严格按照 FIFO 的严格顺序进行生产和消费。

    注意:建议创建不同的 Topic 来发送不同类型的消息,例如用 Topic A 发送普通消息,Topic B 发送事务消息, Topic C 发送延时/定时消息。

  7. 描述一栏,输入该 Topic 的备注内容,然后单击确定。您创建的 Topic 将出现在 Topic 列表中。

创建生产者(Producer ID)

创建好 Topic 后,要为这个 Topic 创建消息生产端的资源,即创建 Producer ID。 一个 Topic 只能对应一个 Producer ID。

请按照以下步骤为您的 Topic 创建 Producer ID:

  1. 在 MQ 控制台的左侧导航栏,选择Topic 管理

  2. Topic 管理页面,找到您刚刚创建的 Topic,单击操作列的创建生产者

  3. 创建生产者对话框输入 Producer ID,单击确定

    注意:

    • Producer ID 必须全局唯一。 如果名称已存在,您将无法创建相同名称的 Producer ID。
    • Topic 对应的生产端必须和这个 Topic 在同一个域,比如您在“公网”域创建了“Topic_open”,那么和“Topic_open”对应的 Producer ID 也必须在同一个域。
    • Producer ID 和 Topic 的关系是 1:N,即一个 Topic 只能绑定一个 Producer ID, 但是同一个 Producer ID 可以对应多个 Topic。

创建消费者(Consumer ID)

创建完消息生产端后,您需要为 Topic 申请相应的消息消费资源,即创建 Consumer ID。

请按以下步骤创建 Consumer ID:

  1. 在 MQ 控制台的左侧导航栏,选择 Topic 管理

  2. Topic 管理页面,找到您创建的 Topic,单击右侧操作列的创建消费者

  3. 创建消费者对话框输入 Consumer ID,单击确定

    注意:

    • Consumer ID 必须全局唯一。 如果名称已存在,您将无法创建相同名称的 Consumer ID。
    • Consumer ID 必须和对应的 Topic 在同一个域,比如“公网”域的“Topic_open”可绑定同在“公网”域的 Consumer ID “CID_123”,而“华北1”域内的“Topic_huabei1”则不能绑定该Consumer ID。
    • Consumer ID 和 Topic 的关系是 N:N。 同一个 Consumer ID 可以订阅多个 Topic,同一个 Topic 也可以对应多个 Consumer ID。

创建阿里云 AccessKey

在调用 SDK/API 进行消息发送和订阅的时候,除了需要指定创建的 Topic, Producer ID 以及 Consumer ID 以外,还需输入您在 RAM 控制台创建的身份验证信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AcessKeySecret。

关于如何创建 AccessKey,请参见创建AccessKey

步骤三:获取接入域名

在控制台创建好资源后,您还需要通过控制台获取 Producer ID 的接入域名和 Consumer ID 的接入域名。

请按照以下步骤获取 Producer ID 的接入域名或 Consumer ID 的接入域名:

  1. 在 MQ 控制台的左侧导航栏,选择生产者管理消费者管理

  2. 找到您创建的 Producer ID 或 Consumer ID,单击右侧操作列中的获取接入点

  3. 获取接入点提示框,单击复制

完成以上准备工作后,您就可以运行示例代码,用 MQ 进行消息发送和订阅了。

步骤四:发送消息

您可以通过控制台发送消息或者调用 SDK/API 发送消息。

  • 控制台发送消息:用于快速验证 Topic 资源的可用性。

  • 调用 SDK/API 发送消息:用于生产环境下使用 MQ。

通过控制台发送消息

控制台发送消息步骤如下:

  1. 在 MQ 控制台的左侧导航栏,选择生产者管理

  2. 生产者管理页面,找到您刚刚创建的 Topic,单击右侧操作列的发送

  3. 发送消息对话框输入消息的具体内容,单击确定。控制台会返回消息发送成功通知以及相应的 Message ID。

调用 SDK/API 发送消息

在生产环境使用 MQ,建议调用 SDK/API 来进行消息发送。本文以 TCP 协议下调用 Java SDK 为例进行说明。如果需要使用其他协议或者开发语言,请参见相关帮助文档。

调用 TCP Java SDK 发送消息

  1. 通过下面两种方式可以引入依赖(任选一种):

    • Maven 方式引入依赖:

      1. <dependency>
      2. <groupId>com.aliyun.openservices</groupId>
      3. <artifactId>ons-client</artifactId>
      4. <version>"XXX"</version>
      5. //设置为 Java SDK 的最新版本号
      6. </dependency>

      关于 Java SDK 的最新版本号,请查看版本说明

    • 下载依赖 JAR 包:

      关于 Java SDK 最新版本的下载链接,请查看版本说明

  2. 根据以下说明设置相关参数,运行示例代码:

    说明:关于 TCP 接入点域名,请进入 MQ 控制台的生产者管理页面,在 PID 右侧操作列单击获取接入点按钮获取。

    1. import com.aliyun.openservices.ons.api.Message;
    2. import com.aliyun.openservices.ons.api.Producer;
    3. import com.aliyun.openservices.ons.api.SendResult;
    4. import com.aliyun.openservices.ons.api.ONSFactory;
    5. import com.aliyun.openservices.ons.api.PropertyKeyConst;
    6. import java.util.Properties;
    7. public class ProducerTest {
    8. public static void main(String[] args) {
    9. Properties properties = new Properties();
    10. // 您在 MQ 控制台创建的 Producer ID
    11. properties.put(PropertyKeyConst.ProducerId, "XXX");
    12. // 鉴权用 AccessKey,在阿里云服务器管理控制台创建
    13. properties.put(PropertyKeyConst.AccessKey,"XXX");
    14. // 鉴权用 SecretKey,在阿里云服务器管理控制台创建
    15. properties.put(PropertyKeyConst.SecretKey, "XXX");
    16. // 设置 TCP 接入域名,进入 MQ 控制台的生产者管理页面,在右侧操作列单击获取接入点获取
    17. // 此处以公有云公网地域接入点为例
    18. properties.put(PropertyKeyConst.ONSAddr,
    19. "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
    20. Producer producer = ONSFactory.createProducer(properties);
    21. // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
    22. producer.start();
    23. //循环发送消息
    24. while(true){
    25. Message msg = new Message( //
    26. // 在控制台创建的 Topic,即该消息所属的 Topic 名称
    27. "TopicTestMQ",
    28. // Message Tag,
    29. // 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
    30. "TagA",
    31. // Message Body
    32. // 任何二进制形式的数据, MQ 不做任何干预,
    33. // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
    34. "Hello MQ".getBytes());
    35. // 设置代表消息的业务关键属性,请尽可能全局唯一,以方便您在无法正常收到消息情况下,可通过 MQ 控制台查询消息并补发
    36. // 注意:不设置也不会影响消息正常收发
    37. msg.setKey("ORDERID_100");
    38. // 发送消息,只要不抛异常就是成功
    39. // 打印 Message ID,以便用于消息发送状态查询
    40. SendResult sendResult = producer.send(msg);
    41. System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
    42. }
    43. // 在应用退出前,可以销毁 Producer 对象
    44. // 注意:如果不销毁也没有问题
    45. producer.shutdown();
    46. }
    47. }

查看消息是否发送成功

消息发送后,您可以在控制台查看消息发送状态,步骤如下:

  1. 在 MQ 控制台左侧菜单栏中单击消息查询

  2. 消息查询页面,选择按 Message ID 查询标签页。

  3. 在搜索框中输入发送消息后返回的 Message ID,单击搜索查询消息发送状态。

    “储存时间”表示 MQ 服务端存储这条消息的时间。如果查询到此消息,表示消息已经成功发送到服务端。

注意:此步骤演示的是第一次使用 MQ 的场景,此时订阅端从未启动过,所以消息状态显示暂无消费数据。要启动订阅端并进行消息订阅请继续下一步操作订阅消息。更多消息状态请参见消息查询

步骤五:订阅消息

消息发送成功后,需要启动订阅方进行消息订阅。本文以 TCP Java SDK 为例,介绍如何通过调用相关协议及开发语言的 SDK/API 来完成消息订阅。

调用 TCP Java SDK 订阅消息

您可以运行以下示例代码来启动订阅端,并测试订阅消息的功能。请按照说明正确设置相关参数。目前控制台提供了 Java,C++, .NET的示例代码。

说明:关于 TCP 接入点域名,请进入 MQ 控制台的消费者管理页面,在 CID 右侧操作列单击获取接入点按钮获取。

  1. import com.aliyun.openservices.ons.api.Action;
  2. import com.aliyun.openservices.ons.api.ConsumeContext;
  3. import com.aliyun.openservices.ons.api.Consumer;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public class ConsumerTest {
  10. public static void main(String[] args) {
  11. Properties properties = new Properties();
  12. // 您在 MQ 控制台创建的 Consumer ID
  13. properties.put(PropertyKeyConst.ConsumerId, "XXX");
  14. // 鉴权用 AccessKey,在阿里云服务器管理控制台创建
  15. properties.put(PropertyKeyConst.AccessKey, "XXX");
  16. // 鉴权用 SecretKey,在阿里云服务器管理控制台创建
  17. properties.put(PropertyKeyConst.SecretKey, "XXX");
  18. // 设置 TCP 接入域名,进入 MQ 控制台的消费者管理页面,在右侧操作列单击获取接入点获取
  19. // 此处以公有云公网地域接入点为例
  20. properties.put(PropertyKeyConst.ONSAddr,
  21. "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
  22. Consumer consumer = ONSFactory.createConsumer(properties);
  23. consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
  24. public Action consume(Message message, ConsumeContext context) {
  25. System.out.println("Receive: " + message);
  26. return Action.CommitMessage;
  27. }
  28. });
  29. consumer.start();
  30. System.out.println("Consumer Started");
  31. }
  32. }

查看消息订阅是否成功

完成上述步骤后,您可以在 MQ 控制台查看订阅端是否启动成功,即消息订阅是否成功。目前控制台查看消费者状态仅支持 TCP 客户端,暂不支持 HTTP 以及 MQTT 客户端。

  1. 在 MQ 控制台左侧导航栏,选择消费者管理

  2. 找到要查看的 Topic,单击右侧操作选项里的消费者状态

    如果是否在线显示为,则说明订阅端已成功启动。如果消费者状态是否在线显示为,说明消费端没有启动或者启动失败。

完成以上所有步骤后,您就成功接入了 MQ 服务,可以用 MQ 进行消息发送和订阅了。