文档

步骤三:调用SDK收发消息

更新时间:
一键部署

云消息队列 RocketMQ 版提供多种语言的SDK用于收发不同类型的消息,本文以Java SDK为例,说明如何调用SDK连接云消息队列 RocketMQ 版服务端,完成普通消息的收发流程。

前提条件

安装Java依赖库

  1. 在IDEA中创建一个Java工程。

  2. pom.xml文件中添加以下依赖引入Java依赖库。

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client-java</artifactId>
        <version>5.0.4</version>
    </dependency>

生产消息

在已创建的Java工程中,创建发送普通消息程序并运行,示例代码如下:

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * 实例接入点,从控制台实例详情页的接入点页签中获取。
         * 如果是在阿里云ECS内网访问,建议填写VPC接入点。
         * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //消息发送的目标Topic名称,需要提前在控制台创建,如果不创建直接使用会返回报错。
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
         * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        /**
         * 初始化Producer时直接配置需要使用的Topic列表(这个参数可以配置多个Topic),实现提前检查错误配置、拦截非法配置启动。
         * 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的Topic是否合法。
         * 注意!!!事务消息Topic必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。
         */
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        //普通消息发送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                //消息体。
                .setBody("messageBody".getBytes())
                .build();
        try {
            //发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

消费消息

在已创建的Java工程中,创建订阅普通消息程序并运行。云消息队列 RocketMQ 版支持SimpleConsumerPushConsumer两种消费者类型,您可以选择任意一种方式订阅消息,具体的消费者类型的差异如下:

对比项

PushConsumer

SimpleConsumer

接口方式

使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。

业务方自行实现消息处理,并主动调用接口返回消费结果。

消费并发度管理

由SDK管理消费并发度。

由业务方消费逻辑自行管理消费线程。

接口灵活度

高度封装,不够灵活。

原子接口,可灵活自定义。

适用场景

适用于无自定义流程的开发场景。

适用于需要高度自定义业务流程的开发场景。

PushConsumer

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
     /**
         * 实例接入点,从控制台实例详情页的接入点页签中获取。
         * 如果是在阿里云ECS内网访问,建议填写VPC接入点。
         * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
        String topic = "Your Topic";
        //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
    /**
         * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
         * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        //订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //设置消费者分组。
                .setConsumerGroup(consumerGroup)
                //设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                //设置消费监听器。
                .setMessageListener(messageView -> {
                    //处理消息并返回消费结果。
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        //如果不需要再使用PushConsumer,可关闭该进程。
        //pushConsumer.close();
    }
}                                                 

SimpleConsumer

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
    /**
         * 实例接入点,从控制台实例详情页的接入点页签中获取。
         * 如果是在阿里云ECS内网访问,建议填写VPC接入点。
         * 如果是在本地公网访问,或者是线下IDC环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //指定需要订阅哪个目标Topic,Topic需要提前在控制台创建,如果不创建直接使用会返回报错。
        String topic = "Your Topic";
        //为消费者指定所属的消费者分组,Group需要提前在控制台创建,如果不创建直接使用会返回报错。
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
    /**
         * 如果是使用公网接入点访问,configuration还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
         * 如果是在阿里云ECS内网访问,无需填写该配置,服务端会根据内网VPC信息智能获取。
         * 如果实例类型为Serverlesss实例,则不管是公网访问还是内网访问都必须设置实例的用户名密码。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        //订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //初始化SimpleConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //设置消费者分组。
                .setConsumerGroup(consumerGroup)
                //设置长轮询超时时间。
                .setAwaitDuration(awaitDuration)
                //设置预绑定的订阅关系。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        //设置本次拉取的最大消息条数。
        int maxMessageNum = 16;
        //设置消息的不可见时间。
        Duration invisibleDuration = Duration.ofSeconds(10);
        //SimpleConsumer需要客户端一直主动循环获取消息,并进行消费处理。
        //如果需要提高消费实时性,建议多线程并发拉取。
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                // LOGGER.info("Received message: {}", messageView);
                System.out.println("Received message: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    //消费处理完成后,需要主动调用ACK向服务端提交消费结果。
                    consumer.ack(message);
                    System.out.println("Message is acknowledged successfully, messageId= " + messageId);
                    //LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                    //LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // 如果不需要再使用SimpleConsumer,可关闭该进程。
        // consumer.close();
    }
}                                           

验证消息

消息收发完成后,您可以通过控制台查看消息消费情况。

  1. 登录控制台,在实例列表页面选择目标实例。

  2. 在左侧导航栏单击消息轨迹

SDK参考

本文以Java SDK为例介绍收发普通消息流程,其他语言SDK和其他类型消息的示例代码,请参见SDK参考概述

  • 本页导读 (1)
文档反馈