快速体验阿里云云消息队列RocketMQ版

更新时间:

本实验将带您快速体验使用云消息队列RocketMQServerless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。

场景简介

本实验将带您快速体验使用云消息队列RocketMQServerless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。

image

背景知识

本场景主要涉及以下云产品和服务:

  • 云消息队列RocketMQ

    云消息队列RocketMQ版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式“消息、事件、流”统一处理平台。RocketMQ自诞生以来一直服务阿里集团十余年,历经多次双十一万亿级数据洪峰稳定性验证。

前提条件

云起实验室将在您的账号下开通本次实操资源,资源按量付费,需要您自行承担本次实操的云资源费用。

重要

本实验使用云消息队列RocketMQServerless系列实例,实验总费用不超过0.1元。如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。

进入实操前,请确保阿里云账号满足以下条件:

  • 已通过实名认证并且账户余额充足。

  • 云资源产生的费用需您自行承担,云起实验室不会向您征收额外费用。

  • 所有实验操作将保留至您的账号,请谨慎操作。

  • 实操结束后,您可以选择继续付费保留资源,或参考手册自动/手动释放资源。

创建实验资源

  1. 在实验页面,勾我已阅读并同意《阿里云云起实践平台服务协议》我已授权阿里云云起实践平台创建、读取及释放实操相关资源后,单击开始实操

  2. 创建资源需要5分钟左右的时间,请您耐心等待。

  3. 云产品资源列表,您可以查看本场景涉及的云产品资源信息。

    image

获取接入点

  1. 云产品资源列表的消息队列RocketMQ区域,单击管理

    image

  2. 实例详情页面的TCP协议接入点区域,可以查看实例的接入点信息。

    • VPC专有网络接入点:使用 VPC 专有网络访问云消息队列RocketMQ版时使用。云消息队列RocketMQ版默认提供的接入点。

    • 公网接入点:使用公网访问云消息队列RocketMQ版时使用该接入点。仅当开启公网访问时显示。

    说明

    本实验以查看公网接入点为例,后续内容会使用到公网接入点。

    image

获取账号密码

客户端接入云消息队列RocketMQ版服务端时,需要根据接入方式配置实例用户名密码。

  • 使用公网访问云消息队列RocketMQ版服务端:需要配置实例的用户名密码。

  • 使用VPC网络访问云消息队列 RocketMQ 版服务端:无需配置实例的用户名密码,系统会根据VPC接入点智能识别用户身份(使用VPC内网访问时系统会按照产生的内网流量计算,云消息队列RocketMQ版不收取费用,在私网连接(PrivateLink)侧进行计费)。

说明

本实验以公网访问为例,查看如何获取RocketMQServerless系列实例的账号密码。

  1. 在左侧导航栏中,单击访问控制

    image

  2. IP白名单页签,单击智能身份识别

    image

  3. 智能身份识别页签,可以查看到实例的用户名密码

    说明

    本实验后续内容需要使用到实例的用户名和密码。

    image

创建Topic

现在我们在RocketMQ实例中创建一个Topic资源。

  1. 在左侧导航栏中,单击Topic管理

    image

  2. Topic管理页面,单击创建Topic

    image

  3. 创建Topic面板,填写Topic名称描述,在本实验中将名称和描述设置为YUNQI-RMQTopic消息类型选择为普通消息,然后单击确定,一个Topic便创建完成了。

    image

创建订阅组(Group)

拥有一个Topic后,我们再创建一个订阅组(Group)。订阅组将被用于消息消费过程。

  1. 在左侧导航栏中,单击Group管理

    image

  2. Group管理页面,单击创建Group

    image

  3. 创建Group面板,填写Group ID,在本实验中将Group ID设置为test-group。其他参数保持默认即可,然后单击确定。此时,一个订阅组便创建完成了。

    image

收发消息

为方便体验,我们选择在控制台进行消息的发送,编写消费者代码并运行,以消费控制台发送的那条消息。

  1. 发送消息。

    1. 在左侧导航栏中,单击Topic管理

      image

    2. Topic管理页面,找到您创建的Topic,单击其右侧操作列下的快速体验

      image

    3. 快速体验的消息生产和消费面板中,发送方式选择控制台,填入消息内容,单击确定

      image

    4. 发送成功后,这条消息便已进入您实例所在的存储中,您可单击这里查看其消息轨迹。

      image

      image

  2. 接收消息。

    编写消费者代码,本实验将说明如何在IntelliJ IDEA中完成消费者的启动。本实验将从0开始教您从零开始构建一个Java项目。若您已有一定开发经验,请您根据真实情况选择性跳过。

    1. 首先,安装IntelliJ IDEA,选择社区版(Community)进行下载并安装。

    2. 打开IntelliJ IDEA,单击New Project

      image

    3. 新建一个Java工程。

      image

    4. 在运行代码前,请在您的工程中添加pom依赖。

      <dependencies>
        <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client-java</artifactId>
          <version>5.0.7</version>
        </dependency>
      </dependencies>

      添加完成后,pom.xml文件如下图所示。

      image

    5. 创建一个名为PushConsumerExample的类,复制并粘贴下方代码,将代码中endpoints、topic、consumerGroup、instanceId、userName、passWord六个成员变量的值修改为您的RocketMQ实例相关信息,然后并运行。

      import org.apache.rocketmq.client.apis.*;
      import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
      import org.apache.rocketmq.client.apis.consumer.FilterExpression;
      import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
      import org.apache.rocketmq.client.apis.consumer.PushConsumer;
      import org.apache.rocketmq.shaded.org.slf4j.Logger;
      import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      import java.util.Collections;
      
      public class PushConsumerExample {
          private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
      
          private PushConsumerExample() {
          }
      
          public static void main(String[] args) throws ClientException, IOException, InterruptedException {
              /**
               * 实例接入点,从控制台实例详情页的接入点页签中获取。
               * 如果是在阿里云ECS内网访问,建议填写VPC接入点。
               * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
               */
              String endpoints = "{实例接入点,本实验需要填写公网接入点,如rmq-cn-xxx.cn-zhangjiakou.rmq.aliyuncs.com:8080}";
              //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
              String topic = "{Topic名称,如YUNQI-RMQTopic}";
              //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
              String consumerGroup = "{Group ID, 如test-group}";
              String instanceId = "{实例id,如rmq-cn-xxx}";
              String userName = "{账号名}";
              String passWord = "{密码}";
      
              final ClientServiceProvider provider = ClientServiceProvider.loadService();
              ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                      .setEndpoints(endpoints)
                      .setNamespace(instanceId)
                      .setCredentialProvider(new StaticSessionCredentialsProvider(userName, passWord))
                      .build();
      
              //订阅消息的过滤规则,表示订阅所有Tag的消息。
              String tag = "*";
              FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
              //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
              PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                      .setClientConfiguration(clientConfiguration)
                      //设置消费者分组。
                      .setConsumerGroup(consumerGroup)
                      //设置预绑定的订阅关系。
                      .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                      //设置消费监听器。
                      .setMessageListener(messageView -> {
                          //处理消息并返回消费结果。
                          // LOGGER.info("Consume message={}", messageView);
                          System.out.println("Consume Message: " + messageView);
                          return ConsumeResult.SUCCESS;
                      })
                      .build();
              Thread.sleep(Long.MAX_VALUE);
              //如果不需要再使用PushConsumer,可关闭该进程。
              //pushConsumer.close();
          }
      }

      image

      启动后,消费成功即可拿到之前在控制台发送的消息。

      image

可观测能力

阿里云云消息队列RocketMQ版的可观测能力多样,细粒度的有消息级别的查询、轨迹查询。粗粒度的有仪表盘,能够在实例维度查看消息的生产、发送、堆积等情况。

  1. 消息查询&轨迹

    针对我们刚刚发送的消息,可以在消息查询功能,查询该消息的具体内容、查看消息轨迹,并可指定消费者进行消费能力验证等。

    1. 在左侧导航栏中,单击消息查询

      image

    2. 消息查询页面,查询方式选择Topic查询Topic选择YUNQI-RMQTopic,单击查询

      image

    3. 消息查询页面,找到目标消息,单击其右侧操作列下的详情,即可查询该消息的具体内容。

      image

      image

    4. 消息查询页面,找到目标消息,单击其右侧操作列下的消息轨迹,即可查询该消息的具体轨迹。

      消息轨迹功能,能够支持对特定消息进行全生命周期的展示,包括其生产者、存储时间、存储 ID、投递事件、消费者等信息。通过该可观测能力,我们能够十分清晰地了解消息收发的细节。

      image

      image

    5. 消息查询页面,找到目标消息,选择其右侧操作列下的更多 > 消费验证,即可指定消费者进行消费能力验证。

      image

      image

    6. 消息查询页面,找到目标消息,选择其右侧操作列下的更多 > 下载消息,即可下载消息内容。

      image

  2. 仪表盘

    相对于消息查询功能,仪表盘属于粗粒度的可观测能力。该能力可以展现实例维度、Topic维度、Group维度的整体情况,包括但不限于收发速率、堆积情况等数据。且依托于Grafana的可视化能力,这些指标的展示都是十分直观且灵活的。

    1. 在左侧导航栏中,单击仪表盘

      image

    2. 服务管理角色对话框中,单击授权

      说明

      如果您的阿里云账号已授权,请您跳过此步骤。

      image

    3. 仪表盘页面,可以查看到实例维度、Topic维度、Group维度的整体情况。在消费者区域,我们可以看到刚刚测试的消息在何时进入实例,消费延迟时间等信息。

      image

其它拓展能力以及参考文档

开源RocketMQGitHub社区中不断迭代成长,定期发布版本,您可以在社区内查看最新特性、提出Bug,甚至参与Bug的修复。

清理资源

  • 在完成实验后,如果无需继续使用资源,选择不保留资源,单击结束实操。在结束实操对话框中,单击确定

    image

  • 在完成实验后,如果需要继续使用资源,选择付费保留资源,单击结束实操。在结束实操对话框中,单击确定。请随时关注账户扣费情况,避免发生欠费。

    image