全部产品
云市场

快速入门

更新时间:2020-01-14 16:40:23

本文将引导您快速体验 SOFAStack 消息队列,从创建资源、配置接入点到使用 SDK 收发消息。具体操作步骤如下:

  1. 创建资源
    1. 创建工作空间
    2. 创建 Topic
    3. 创建 Group ID
    4. 创建 AccessKey
  2. 获取接入配置
  3. 发送消息
  4. 订阅消息

创建资源

注意事项

在使用 SOFAStack 消息队列时,请注意以下网络访问限制:

  • Topic 和 Group ID 需创建在同一个地域(Region)下的同一个工作空间中才能互通。例如,当某 Topic 创建在华东 1(杭州)下的工作空间 A 中,那么该 Topic 只能被在华东 1(杭州)下的工作空间 A 中创建的 Group ID 对应的生产端和消费端访问。
  • 目前不支持公网访问,生产端和消费端需要部署在相同地域的 ECS 上,或者保证网络联通。

创建工作空间

要使用消息队列,您需要确保 SOFAStack 控制台已创建至少一个工作空间。如 SOFAStack 未创建工作空间或您需要创建一个新的工作空间,可参见 管理工作空间 > 添加工作空间。创建好工作空间后,将为您自动创建一个消息队列实例。

创建 Topic

Topic 是消息队列里对消息的一级归类。消息生产者将消息发送到一个 Topic,而消息消费者则通过订阅该 Topic 来获取和消费消息。

  1. 在控制台左侧导航栏,点击 Topic 管理
  2. 在 Topic 管理页面上方,点击 创建 Topic 按钮。
  3. 创建 Topic 对话框中,输入 Topic 名称,选择该 Topic 对应的消息类型,填写该 Topic 的备注内容。
  4. 点击 确定,完成创建,您创建的 Topic 将出现在 Topic 列表中。

消息类型的更多信息,请参见 消息类型

创建 Group ID

创建完 Topic 后,您需要为消息的消费者(或生产者)创建客户端 ID ,即 Group ID 作为标识。

Group ID 和 Topic 的关系是 N:N,即一个消费者可以订阅多个 Topic,同一个 Topic 也可以被多个消费者订阅;一个生产者可以向多个 Topic 发送消息,同一个 Topic 也可以接收来自多个生产者的消息。

说明:消费者必须有对应的 Group ID,生产者不做强制要求。

  1. 在控制台左侧导航栏,点击 Group 管理
  2. 在 Group 管理页面上方,点击 创建 Group ID
  3. 在创建 Group ID 对话框中,输入 Group ID 和描述,然后点击 确认

创建 AccessKey

阿里云 AccessKey 用于收发消息时进行账户鉴权。

在调用 SDK 发送和订阅消息的时候,除了需要指定创建的 Topic 和 Group ID 以外,还需输入您在 RAM 控制台创建的身份验证信息,即 AccessKey。AccessKey 的信息包含 AccessKeyId 和 AcessKeySecret。

创建 AccessKey 的具体步骤,参见 创建 AccessKey

获取接入配置

在控制台创建好资源后,您需通过控制台获取工作空间的接入点。在收发消息时,您需要为生产端和消费端配置该接入点,以此接入某个具体工作空间或地域的服务。

  1. 进入消息队列控制台页面,选择地域和工作空间。
  2. 在概览页底部的 接入配置 中,即可找到 TCP 协议内网接入点实例 ID
  3. 将该 TCP 协议内网接入点 配置到您客户端的 SDK 代码的 ENDPOINT 参数。
  4. 实例 ID 配置到您客户端的SDK代码的 INSTANCEID 参数。

接入点设置

发送消息

您可以通过控制台发送测试消息或通过调用 TCP Java SDK 发送消息。

发送测试消息

用于快速验证 Topic 资源的可用性,主要用作测试。

  1. 在控制台左侧导航栏,点击 Topic 管理
  2. 在 Topic 管理页面,找到您刚刚创建的 Topic,单击右侧操作列的 发送测试消息
  3. 发送测试消息 对话框中的 消息体 一栏,输入消息的具体内容,点击 确定。控制台即会返回消息发送成功通知以及相应的 Message ID。

调用 SDK 发送消息

  1. 通过 Maven 方式引入依赖。Java SDK 的最新版本号,可参见 SDK 版本说明

    1. <dependency>
    2. <groupId>com.alipay.sofa</groupId>
    3. <artifactId>sofamq-client-all</artifactId>
    4. <version>"XXX"</version>
    5. //设置为 Java SDK 的最新版本号
    6. </dependency>
    7. <repositories>
    8. <repository>
    9. <id>antcloudrelease</id>
    10. <name>Ant Cloud</name>
    11. <url>http://mvn.cloud.alipay.com/nexus/content/groups/open</url>
    12. </repository>
    13. </repositories>
  2. 根据以下说明设置相关参数,运行示例代码:

    1. import java.util.Properties;
    2. import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    3. import io.openmessaging.api.Message;
    4. import io.openmessaging.api.MessagingAccessPoint;
    5. import io.openmessaging.api.OMS;
    6. import io.openmessaging.api.OMSBuiltinKeys;
    7. import io.openmessaging.api.Producer;
    8. import io.openmessaging.api.SendResult;
    9. public class Main {
    10. public static void main(String... args) {
    11. Properties credentials = new Properties();
    12. // 鉴权用 AccessKeyId,在阿里云服务器管理控制台创建
    13. credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
    14. // 鉴权用 AccessKeySecret,在阿里云服务器管理控制台创建
    15. credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
    16. // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
    17. MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
    18. .withCredentials(credentials).build();
    19. Properties properties = new Properties();
    20. // 设置用户实例,进入控制台的概览页面查看接入点配置
    21. properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
    22. // 您在控制台创建的 Group ID
    23. properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
    24. Producer producer = accessPoint.createProducer(properties);
    25. producer.start();
    26. Message message = new Message("$topic", "YOUR_TAG", "hello world".getBytes());
    27. SendResult sendResult = producer.send(message);
    28. System.out.println(sendResult);
    29. }
    30. }
  3. 消息发送后,您可以在控制台查看消息发送状态,步骤如下:
    1. 在控制台左侧导航栏,选择 消息查询 > 按 Message ID 查询
    2. 在搜索框中输入发送消息后返回的 Message ID,点击 搜索 查询消息发送状态。
      储存时间 表示消息队列服务端存储这条消息的时间。如果查询到此消息,表示消息已经成功发送到服务端。

      注意:此步骤演示的是第一次使用消息队列的场景,此时消费者从未启动过,所以消息状态显示暂无消费数据。要启动消费者并进行消息订阅请继续下一步操作订阅消息。更多消息状态请参见 消息查询查询消息轨迹

订阅消息

消息发送成功后,需要启动消费者来订阅消息。

  1. 调用 TCP Java SDK 订阅消息。
    您可以运行以下示例代码来启动消费者,并测试订阅消息的功能。请按照说明正确设置相关参数。

    1. import java.util.Properties;
    2. import com.alipay.sofa.sofamq.client.PropertyKeyConst;
    3. import io.openmessaging.api.Action;
    4. import io.openmessaging.api.ConsumeContext;
    5. import io.openmessaging.api.Consumer;
    6. import io.openmessaging.api.Message;
    7. import io.openmessaging.api.MessageListener;
    8. import io.openmessaging.api.MessagingAccessPoint;
    9. import io.openmessaging.api.OMS;
    10. import io.openmessaging.api.OMSBuiltinKeys;
    11. public class Main {
    12. public static void main(String... args) {
    13. Properties credentials = new Properties();
    14. // 鉴权用 AccessKeyId,在阿里云服务器管理控制台创建
    15. credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "$accessKey");
    16. // 鉴权用 AccessKeySecret,在阿里云服务器管理控制台创建
    17. credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "xxxxx");
    18. // 设置 TCP 接入域名,进入控制台的概览页面查看接入点配置
    19. MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
    20. .withCredentials(credentials).build();
    21. Properties properties = new Properties();
    22. // 设置用户实例,进入控制台的概览页面查看接入点配置
    23. properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
    24. // 您在控制台创建的 Group ID
    25. properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
    26. Consumer consumer = accessPoint.createConsumer(properties);
    27. consumer.subscribe("YOUR_TOPIC", "YOUR_TAG", new MessageListener() {
    28. @Override
    29. public Action consume(Message message, ConsumeContext context) {
    30. System.out.println(new String(message.getBody()));
    31. return Action.CommitMessage;
    32. }
    33. });
    34. consumer.start();
    35. }
    36. }
  2. 完成上述步骤后,您可以在控制台查看消费者是否启动成功,即消息订阅是否成功。
    1. 在消息队列控制台左侧导航栏,点击 Group 管理
    2. 找到要查看的 Group ID,点击该 Group ID 所在行操作列的 订阅关系。如果 是否在线 显示为 ,且订阅关系一致,则说明订阅成功。否则说明订阅失败。