全部产品
阿里云办公

主账号 - 快速入门

更新时间:2019-01-30 14:54:49

本文主要描述主账号从开通服务、创建资源,到使用 SDK 进行消息收发的完整流程,旨在以最简单明了的方式引导使用主账号的您快速上手消息队列 RocketMQ,为进一步使用和熟悉产品功能提供入门。

消息收发部分以 TCP 协议下调用 Java SDK 为例来演示。

说明:若您要在 TCP 协议下使用 C/C++ 和 .NET SDK 来收发消息,请参见 C/C++ 收发普通消息.NET 收发普通消息;若要使用 MQTT 协议,请使用微消息队列 for IoT

消息队列 RocketMQ 快速接入流程图(主账号):

quickstart_process

步骤一:开通服务

请按照以下步骤开通消息队列 RocketMQ 服务:

  1. 登录阿里云主页,将鼠标依次移动到产品 > 企业应用 > 消息队列 MQ ,单击消息队列 RocketMQ进入消息队列 RocketMQ 的产品主页。

  2. 在消息队列 RocketMQ 的产品主页上,单击立即开通进入消息队列 RocketMQ 服务开通页面,根据提示完成开通服务。

如果您已经开通消息队列 RocketMQ 服务,请直接登录消息队列 RocketMQ 控制台

步骤二:创建资源

资源类型说明

一个新的应用接入消息队列 RocketMQ 需要先创建相关的消息队列 RocketMQ 资源,包括:

  • 实例:用于消息队列 RocketMQ 服务的虚拟机资源,会存储消息主题(Topic)和客户端 ID(Group ID)信息。

  • 消息主题(Topic):在消息队列 RocketMQ 的消息系统中,消息生产者将消息发送到某个指定的 Topic ,而消息消费者则通过订阅该指定的 Topic 来获取和消费消息。

  • Group ID:用于消息消费者(或生产者)的标识

  • 阿里云 AccessKey:用于收发消息时进行账户鉴权

注意:当您删除某实例时,该实例中的所有 Topic 和 Group ID 也会在 10 分钟内被清理;若单独删除 Topic 或 Group ID,则不会对其他资源造成影响。

网络访问说明

在使用消息队列 RocketMQ 时,请注意以下网络访问限制:

  • 只有在同一个地域下的同一个实例中的 Topic 和 Group ID 才能互通,即某 Topic 是在哪个地域的哪个实例中创建的,它就只能被同样在该地域下的该实例中的创建的 Group ID 对应的生产端和消费端访问。

    例如,当某 Topic 是创建在华北 2下的实例 A 中,那么该 Topic 只能被在华北 2下的实例 A 中创建的 Group ID 对应的生产端和消费端访问。

  • 如果只是测试,或者需要在本地(非阿里云 ECS 服务器)使用消息队列 RocketMQ 的服务,请将 Topic 和 Group ID 都创建在“公网”地域下的实例中。 生产端和消费端可以部署在本地或者部署在任意地域的 ECS 上,前提是本地服务器或者相应的 ECS 需要能够访问公网。但注意遵循上一条原则,Topic 不能跨实例使用。

有关地域的详细介绍请参见 ECS 文档中的地域和可用区

选择地域

  1. 登录消息队列 RocketMQ 控制台

  2. 在页面上方选择地域(比如公网地域),即您要将资源创建在哪个地域。

创建实例

  1. 在控制台左侧导航栏选择实例管理

  2. 实例管理页面,单击创建实例按钮。

  3. 创建实例对话框,选择实例类型,并输入实例名和描述,然后单击确定

创建消息主题(Topic)

消息主题(Topic)是消息队列 RocketMQ 里对消息进行的一级归类,比如可以创建“Topic_Trade”这一主题用来识别交易类消息。

  1. 在控制台左侧导航栏选择Topic管理

  2. Topic管理页面上方选择刚创建的实例。

  3. 单击创建 Topic按钮。

  4. 创建 Topic对话框中的 Topic 一栏,输入 Topic 名称。

    注意:Topic 名称必须在同一实例中是唯一的。

  5. 消息类型一栏,选择该 Topic 对应的消息类型,即该 Topic 用来收发何种类型的消息。

    消息类型说明:

    • 普通消息:无特性的消息,区分于事务消息、定时/延时消息和顺序消息。
    • 事务消息:提供类似 X/Open XA 的分布事务功能,能达到分布式事务的最终一致。
    • 定时/延时消息:可指定消息延迟投递,即在未来的某个特定时间点或一段特定的时间后进行投递。
    • 分区顺序消息:消息根据 sharding key 进行分区,提高整体并发度与使用性能。 同一个分区的消息严格按照 FIFO 的严格顺序进行生产和消费。
    • 全局顺序消息:所有消息严格按照 FIFO 的严格顺序进行生产和消费。

    注意:建议创建不同的 Topic 来发送不同类型的消息,例如用 Topic A 发送普通消息,Topic B 发送事务消息, Topic C 发送延时/定时消息。

  6. 描述一栏,输入该 Topic 的备注内容,然后单击确定。您创建的 Topic 将出现在 Topic 列表中。

创建 Group ID

创建完实例和 Topic 后,您需要为消息的消费者(或生产者)创建客户端 ID ,即 Group ID。

说明:消费者必须有对应的 Group ID,生产者不做强制要求。

  1. 在控制台左侧导航栏选择Group 管理

  2. Group 管理页面上方选择刚创建的实例。

  3. 单击创建 Group ID

  4. 创建 Group ID对话框中,输入 Group ID 和描述,然后单击确定

    注意:

    • Group ID 必须在同一实例中是唯一的。

    • Group ID 和 Topic 的关系是 N:N,即一个消费者可以订阅多个 Topic,同一个 Topic 也可以被多个消费者订阅;一个生产者可以向多个 Topic 发送消息,同一个 Topic 也可以接收来自多个生产者的消息。

创建阿里云 AccessKey

在调用 SDK/API 进行消息发送和订阅的时候,除了需要指定创建的 Topic 和 Group ID 以外,还需输入您在 RAM 控制台创建的身份验证信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AcessKeySecret。

关于如何创建 AccessKey,请参见创建AccessKey

步骤三:获取接入域名

在控制台创建好资源后,您还需要通过控制台获取生产者和消费者的接入域名。

  1. 在控制台左侧导航栏选择实例管理

  2. 实例管理页面上方选择刚创建的实例。

  3. 在默认显示的实例信息页签的获取接入点信息区域,您可以在TCP 接入点一栏看到接入点域名。

  4. TCP接入点一栏,单击复制

    您还可以单击示例代码,查看在各种开发语言的程序中如何设置接入点。

完成以上准备工作后,您就可以运行示例代码,用消息队列 RocketMQ 进行消息发送和订阅了。

步骤四:发送消息

您可以通过控制台发送消息或者调用 SDK/API 发送消息。

  • 控制台发送消息:用于快速验证 Topic 资源的可用性。

  • 调用 SDK/API 发送消息:用于生产环境下使用消息队列 RocketMQ。

通过控制台发送消息

  1. 在控制台左侧导航栏,选择Topic 管理

  2. Topic 管理页面,找到您刚刚创建的 Topic,单击右侧操作列的发送

  3. 发送消息对话框中的Message Body一栏,输入消息的具体内容,单击确定

    控制台会返回消息发送成功通知以及相应的 Message ID。

调用 SDK/API 发送消息

在生产环境使用消息队列 RocketMQ,建议调用 SDK/API 来进行消息发送。本文以 TCP 协议下调用 Java SDK 为例进行说明。如果需要使用其他协议或者开发语言,请参见相关帮助文档。

调用 TCP Java SDK 发送消息

  1. 通过下面两种方式可以引入依赖(任选一种):

    • Maven 方式引入依赖:

      1. <dependency>
      2. <groupId>com.aliyun.openservices</groupId>
      3. <artifactId>ons-client</artifactId>
      4. <version>"XXX"</version>
      5. //设置为 Java SDK 的最新版本号
      6. </dependency>

      关于 Java SDK 的最新版本号,请查看版本说明

    • 下载依赖 JAR 包:

      关于 Java SDK 最新版本的下载链接,请查看版本说明

  2. 根据以下说明设置相关参数,运行示例代码:

    1. import com.aliyun.openservices.ons.api.Message;
    2. import com.aliyun.openservices.ons.api.Producer;
    3. import com.aliyun.openservices.ons.api.SendResult;
    4. import com.aliyun.openservices.ons.api.ONSFactory;
    5. import com.aliyun.openservices.ons.api.PropertyKeyConst;
    6. import java.util.Properties;
    7. public class ProducerTest {
    8. public static void main(String[] args) {
    9. Properties properties = new Properties();
    10. // 您在控制台创建的 Group ID
    11. properties.put(PropertyKeyConst.GROUP_ID, "XXX");
    12. // 鉴权用 AccessKey,在阿里云服务器管理控制台创建
    13. properties.put(PropertyKeyConst.AccessKey,"XXX");
    14. // 鉴权用 SecretKey,在阿里云服务器管理控制台创建
    15. properties.put(PropertyKeyConst.SecretKey, "XXX");
    16. // 设置 TCP 接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
    17. properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
    18. Producer producer = ONSFactory.createProducer(properties);
    19. // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
    20. producer.start();
    21. //循环发送消息
    22. while(true){
    23. Message msg = new Message( //
    24. // 在控制台创建的 Topic,即该消息所属的 Topic 名称
    25. "TopicTestMQ",
    26. // Message Tag,
    27. // 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
    28. "TagA",
    29. // Message Body
    30. // 任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预,
    31. // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
    32. "Hello MQ".getBytes());
    33. // 设置代表消息的业务关键属性,请尽可能全局唯一,以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发
    34. // 注意:不设置也不会影响消息正常收发
    35. msg.setKey("ORDERID_100");
    36. // 发送消息,只要不抛异常就是成功
    37. // 打印 Message ID,以便用于消息发送状态查询
    38. SendResult sendResult = producer.send(msg);
    39. System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
    40. }
    41. // 在应用退出前,可以销毁 Producer 对象
    42. // 注意:如果不销毁也没有问题
    43. producer.shutdown();
    44. }
    45. }

查看消息是否发送成功

消息发送后,您可以在控制台查看消息发送状态,步骤如下:

  1. 在控制台左侧导航栏中选择消息查询

  2. 消息查询页面,选择按 Message ID 查询页签。

  3. 在搜索框中输入发送消息后返回的 Message ID,单击搜索查询消息发送状态。

    “储存时间”表示消息队列 RocketMQ 服务端存储这条消息的时间。如果查询到此消息,表示消息已经成功发送到服务端。

注意:此步骤演示的是第一次使用消息队列 RocketMQ 的场景,此时消费者从未启动过,所以消息状态显示暂无消费数据。要启动消费者并进行消息订阅请继续下一步操作订阅消息。更多消息状态请参见消息查询

步骤五:订阅消息

消息发送成功后,需要启动消费者进行消息订阅。本文以 TCP Java SDK 为例,介绍如何通过调用相关协议及开发语言的 SDK/API 来完成消息订阅。

调用 TCP Java SDK 订阅消息

您可以运行以下示例代码来启动消费者,并测试订阅消息的功能。请按照说明正确设置相关参数。目前控制台提供了 Java,C++, .NET的示例代码。

  1. import com.aliyun.openservices.ons.api.Action;
  2. import com.aliyun.openservices.ons.api.ConsumeContext;
  3. import com.aliyun.openservices.ons.api.Consumer;
  4. import com.aliyun.openservices.ons.api.Message;
  5. import com.aliyun.openservices.ons.api.MessageListener;
  6. import com.aliyun.openservices.ons.api.ONSFactory;
  7. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  8. import java.util.Properties;
  9. public class ConsumerTest {
  10. public static void main(String[] args) {
  11. Properties properties = new Properties();
  12. // 您在控制台创建的 Group ID
  13. properties.put(PropertyKeyConst.GROUP_ID, "XXX");
  14. // 鉴权用 AccessKey,在阿里云服务器管理控制台创建
  15. properties.put(PropertyKeyConst.AccessKey, "XXX");
  16. // 鉴权用 SecretKey,在阿里云服务器管理控制台创建
  17. properties.put(PropertyKeyConst.SecretKey, "XXX");
  18. // 设置 TCP 接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
  19. properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
  20. Consumer consumer = ONSFactory.createConsumer(properties);
  21. consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
  22. public Action consume(Message message, ConsumeContext context) {
  23. System.out.println("Receive: " + message);
  24. return Action.CommitMessage;
  25. }
  26. });
  27. consumer.start();
  28. System.out.println("Consumer Started");
  29. }
  30. }

查看消息订阅是否成功

完成上述步骤后,您可以在控制台查看消费者是否启动成功,即消息订阅是否成功。

  1. 在控制台左侧导航栏选择Group 管理

  2. 找到要查看的消费者的 Group ID,单击右侧操作列里的消费者状态

    如果状态为在线,则说明消费者已成功启动。如果状态为离线,说明消费者没有启动或者启动失败。

完成以上所有步骤后,您就成功接入了消息队列 RocketMQ 服务,可以用消息队列 RocketMQ 进行消息发送和订阅了。