本文介绍如何在 Spring 框架下用消息队列 RocketMQ 版收发消息。当前支持与 Spring 框架集成的消息类型有普通消息、事务消息以及顺序消息。

背景信息

本文包括以下三部分内容:
  • 普通消息生产者、消费者和 Spring 集成。
  • 事务消息生产者、消费者和 Spring 集成。
  • 顺序消息生产者、消费者和 Spring 集成。

参数说明

与 Spring 集成中所需配置的参数如下所示。

参数 说明
GROUP_ID 您在控制台创建的 Group ID,用于对消费者或生产者实例进行分类。详情请参见名词解释
AccessKey 阿里云身份验证 AccessKeyId,在阿里云用户信息管理控制台获取。
SecretKey 阿里云身份验证 AccessKeySecret,在阿里云用户信息管理控制台获取。
NAMESRV_ADDR 设置 TCP 接入域名,进入控制台的实例详情页面的 TCP 协议客户端接入点区域查看。
expression 消息过滤表达式,例如 “TagA||TagB”,说明消费者订阅了带有 TagA 和 Tag B 两种 Tag 的消息。详情请参见订阅消息

Spring 框架下支持的更多配置参数请参见 Java SDK 接口和参数说明

引入依赖

通过 Maven 方式引入以下依赖。

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>XXX</version>
    //设置为 Java SDK 的最新版本号
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>4.3.19.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.3.19.RELEASE</version>
</dependency>

客户端(ons-client)版本的详情请参见 Java SDK 版本说明

普通消息生产者与 Spring 集成

  1. producer.xml 中定义生产者 Bean 等信息。
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start"
              destroy-method="shutdown">
            <!--生产者配置信息-->
            <property name="properties">
                <props>
                    <!-- Note: 请替换 ${...} 为您的账户对应的资源信息 -->
                    <prop key="GROUP_ID">${GROUP_ID}</prop>
                    <prop key="AccessKey">${AccessKey}</prop>
                    <prop key="SecretKey">${SecretKey}</prop>
                    <prop key="NAMESRV_ADDR">${NAMESRV_ADDR}</prop>
                </props>
            </property>
        </bean>
    
    </beans>
  2. 通过已经与 Spring 集成好的生产者生产消息。
    package demo;
    
    import java.io.IOException;
    
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.exception.ONSClientException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class DemoProducer {
        public static void main(String[] args) throws IOException {
            /**
             * 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(例如具体的 Controller)中。
             */
            ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
            Producer producer = (Producer)context.getBean("producer");
    
            // 循环发送消息。
            for (int i = 0; i < 100; i++) {
                // Note: 根据需要设置 Topic、Tag、Message body、Key 等。
                Message msg = new Message(
                    // 消息所属的 Topic。
                    "DemoTopic",
                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 RocketMQ 服务器过滤。
                    "DemoTag",
                    // Message Body 可以是任何二进制形式的数据,RocketMQ 不做任何干预。
                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式。
                    "Hello MQ".getBytes());
    
                // 设置代表消息的业务关键属性,请尽可能全局唯一。
                // 以方便您在无法正常收到消息情况下,可通过 RocketMQ 控制台查询消息并补发。
                // 注意:不设置也不会影响消息正常收发。
                msg.setKey("DemoKey");
    
                // 发送消息,不抛异常就代表发送成功。
                try {
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                } catch (ONSClientException e) {
                    // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
                    System.out.println("Failed!");
                }
            }
    
            producer.shutdown();
        }
    }

事务消息生产者与 Spring 集成

事务消息的概念详情请参见事务消息

  1. 首先需要实现一个 LocalTransactionChecker 接口,如下所示。 一个消息生产者只能有一个 LocalTransactionChecker 接口。
    package demo;
    
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
    import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    
    public class DemoLocalTransactionChecker implements LocalTransactionChecker {
        @Override
        public TransactionStatus check(Message msg) {
            System.out.println("开始回查本地事务状态");
    
            // 检查本地事务状态...
    
            // Note: 根据本地事务状态检查结果返回不同的 TransactionStatus。
            return TransactionStatus.CommitTransaction;
        }
    }
    						
  2. transactionProducer.xml 中定义事务消息生产者 Bean 等信息。
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"/>
    
        <!--事务生产者 Bean 配置信息-->
        <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean"
              init-method="start" destroy-method="shutdown">
            <property name="properties">
                <props>
                    <!-- Note: 请替换 ${...} 您账户对应的资源信息 -->
                    <prop key="GROUP_ID">${GROUP_ID}</prop>
                    <prop key="AccessKey">${AccessKey}</prop>
                    <prop key="SecretKey">${SecretKey}</prop>
                    <prop key="NAMESRV_ADDR">${NAMESRV_ADDR}</prop>
                </props>
            </property>
            <property name="localTransactionChecker" ref="localTransactionChecker"/>
        </bean>
    
    </beans>
  3. 通过已经与 Spring 集成好的生产者生产事务消息。
    package demo;
    
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
    import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
    import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class DemoTransactionProducer {
    
        public static void main(String[] args) {
            /**
             * 事务消息生产者 Bean 配置在 transaction_producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(例如具体的 Controller)中。
             * 请结合文档 https://help.aliyun.com/document_detail/49323.html 中的“发送事务消息”部分参数说明。
             */
            ApplicationContext context = new ClassPathXmlApplicationContext("transaction_producer.xml");
    
            TransactionProducer transactionProducer = (TransactionProducer)context.getBean("transactionProducer");
    
                    // 循环发送消息
            for (int i = 0; i < 100; i++) {
                // Note: 根据需要设置 Topic、Tag、Message body、Key 等。
                Message msg = new Message(
                    // 消息所属的 Topic。
                    "DemoTopic",
                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 RocketMQ 服务器过滤。
                    "DemoTag",
                    // Message Body 可以是任何二进制形式的数据,RocketMQ 不做任何干预。
                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式。
                    "Hello MQ".getBytes());
                try {
                    SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                        @Override
                        public TransactionSttus execute(Message msg, Object arg) {
                            System.out.println("执行本地事务");
    
                            // Note: 根据本地事务执行结果来返回不同的 TransactionStatus。
                            return TransactionStatus.CommitTransaction;
                        }
                    }, null);
                    System.out.println(sendResult);
                } catch (Exception e) {
                    // 出现异常意味着发送失败或者,为了避免消息丢失,建议缓存该消息然后进行重试。
                    System.out.println("Failed!");
                }
            }
    
            transactionProducer.shutdown();
        }
    }

普通与事务消息消费者与 Spring 集成

普通消息与事务消息的消费者示例代码相同。

  1. 实现 MessageListener 接口,如下所示。
    package demo;
    
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    
    public class DemoMessageListener implements MessageListener {
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            System.out.println("Receive: " + message);
            try {
                // 消费逻辑...
                return Action.CommitMessage;
            } catch (Exception e) {
                // 消费失败
                return Action.ReconsumeLater;
            }
        }
    }
  2. consumer.xml 中定义消费者 Bean 等信息。
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!--Listener 配置-->
        <bean id="msgListener" class="demo.DemoMessageListener"/>
    
        <!--消费者 Bean 配置-->
        <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start"
              destroy-method="shutdown">
            <!--消费者配置信息-->
            <property name="properties">
                <props>
                    <!-- Note: 请替换 ${...} 为自己账户资源对应的信息 -->
                    <prop key="GROUP_ID">${GROUP_ID}</prop>
                    <prop key="AccessKey">${AccessKey}</prop>
                    <prop key="SecretKey">${SecretKey}</prop>
                    <prop key="NAMESRV_ADDR">${NAMESRV_ADDR}</prop>
                </props>
            </property>
            <property name="subscriptionTable">
                <map>
                    <entry value-ref="msgListener">
                        <key>
                            <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                                <!-- Note: 请替换 ${...} 为自己账户资源对应的信息 -->
                                <property name="topic" value="${YourTopic}"/>
                                <property name="expression" value="${YourExpression}"/>
                                <!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成 *。* 仅代表订阅所有 Tag。-->
                            </bean>
                        </key>
                    </entry>
                    <!--更多的订阅添加 entry 节点即可-->
                </map>
            </property>
        </bean>
    </beans>
  3. 运行已经与 Spring 集成好的消费者,如下所示。
    package demo;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class DemoConsumer {
        public static void main(String[] args) {
    
            /**
             * 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(例如具体的 Controller)中。
             */
            ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
            System.out.println("Consumer Started");
        }
    }

顺序消息生产者与 Spring 集成

  1. orderProducer.xml 中定义生产者 Bean 等信息。
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!--顺序生产者 Bean 配置信息-->
        <bean id="producer" class="com.aliyun.openservices.ons.api.bean.OrderProducerBean" init-method="start"
              destroy-method="shutdown">
            <property name="properties">
                <props>
                    <!-- Note: 请替换 ${...} 为您账户对应的资源信息 -->
                    <prop key="GROUP_ID">${GROUP_ID}</prop>
                    <prop key="AccessKey">${AccessKey}</prop>
                    <prop key="SecretKey">${SecretKey}</prop>
                    <prop key="NAMESRV_ADDR">${NAMESRV_ADDR}</prop>
                </props>
            </property>
        </bean>
    
    </beans>
  2. 通过已经与 Spring 集成好的生产者生产消息。
    package demo;
    
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.exception.ONSClientException;
    import com.aliyun.openservices.ons.api.order.OrderProducer;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class DemoOrderProducer {
        public static void main(String[] args) {
            /**
             * 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(例如具体的 Controller)中。
             */
            ApplicationContext context = new ClassPathXmlApplicationContext("order_producer.xml");
            OrderProducer orderProducer = (OrderProducer) context.getBean("producer");
    
            // Note: 不同的消息请按照业务需求设置相应的 sharding key。
            String shardingKey = "OrderedKey";
    
            // 循环发送消息。
            for (int i = 0; i < 100; i++) {
                // Note: 根据需要设置 Topic、Tag、Message body、Key 等。
                Message msg = new Message(
                    // Message所属的Topic
                    "DemoTopic",
                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 RocketMQ 服务器过滤。
                    "DemoTag",
                    // Message Body 可以是任何二进制形式的数据,RocketMQ 不做任何干预。
                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式。
                    "Hello MQ".getBytes());
    
                // 设置代表消息的业务关键属性,请尽可能全局唯一。
                // 以方便您在无法正常收到消息情况下,可通过RocketMQ 控制台查询消息并补发。
                // 注意:不设置也不会影响消息正常收发。
                msg.setKey("DemoKey");
    
                // 发送消息,不抛异常就代表发送成功。
                try {
                    // Note: 不同的消息请按照业务需求设置相应的 sharding key。
                    SendResult sendResult = orderProducer.send(msg, shardingKey);
                    System.out.println(sendResult);
                } catch (ONSClientException e) {
                    // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
                    System.out.println("Failed!");
                }
            }
    
            orderProducer.shutdown();
        }
    }

顺序消息消费者与 Spring 集成

  1. 实现 MessageOrderListener 接口,如下所示。
    package demo;
    
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
    import com.aliyun.openservices.ons.api.order.MessageOrderListener;
    import com.aliyun.openservices.ons.api.order.OrderAction;
    
    public class DemoOrderMessageListener implements MessageOrderListener {
        @Override
        public OrderAction consume(final Message message, final ConsumeOrderContext context) {
            System.out.println("Receive: " + message);
            try {
                // 消费逻辑...
                return OrderAction.Success;
            } catch (Exception e) {
                // 消费失败,挂起当前队列。
                return OrderAction.Suspend;
            }
        }
    }
    						
  2. orderConsumer.xml 中定义消费者 Bean 等信息。
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!--Listener 配置-->
        <bean id="msgListener" class="demo.DemoOrderMessageListener"/>
    
        <!--顺序消费者 Bean 配置-->
        <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.OrderConsumerBean" init-method="start"
              destroy-method="shutdown">
            <!--消费者配置信息-->
            <property name="properties">
                <props>
                    <!-- Note: 请替换 ${...} 为您账户对应的资源信息 -->
                    <prop key="GROUP_ID">${GROUP_ID}</prop>
                    <prop key="AccessKey">${AccessKey}</prop>
                    <prop key="SecretKey">${SecretKey}</prop>
                    <prop key="NAMESRV_ADDR">${NAMESRV_ADDR}</prop>
                </props>
            </property>
            <property name="subscriptionTable">
                <map>
                    <entry value-ref="msgListener">
                        <key>
                            <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                                <!-- Note: 请替换 ${...} 为您账户对应的资源信息 -->
                                <property name="topic" value="${YourTopic}"/>
                                <property name="expression" value="${YourExpression}"/>
                                <!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成 *。* 仅代表订阅所有 Tag。-->
                            </bean>
                        </key>
                    </entry>
                    <!--更多的订阅添加 entry 节点即可-->
                </map>
            </property>
        </bean>
    </beans>
  3. 运行已经与 Spring 集成好的消费者,如下所示。
    package demo;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class DemoOrderConsumer {
        public static void main(String[] args) {
            /**
             * 消费者 Bean 配置在 order_consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(例如具体的 Controller)中。
             */
            ApplicationContext context = new ClassPathXmlApplicationContext("order_consumer.xml");
            System.out.println("OrderConsumer Started");
        }
    }