全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
消息队列 MQ

Spring 集成

更新时间:2017-12-15 20:41:41

本文介绍如何在 Spring 框架下用 MQ 收发消息。主要包括三部分内容:普通消息生产者和 Spring 集成,事务消息生产者和 Spring 集成, 消息消费者与 Spring 集成。

请确保同一个 Consumer ID 下所有 Consumer 实例的订阅关系保持一致,具体请参考订阅关系一致文档。

Spring 框架下支持的配置参数和 TCP Java 一致,具体可以参考Java SDK 使用说明

生产者与 Spring 集成

  1. 在 producer.xml 中定义生产者 Bean 等信息。

     <?xml version="1.0" encoding="UTF-8"?>
     <beans xmlns="http://www.springframework.org/schema/beans"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
         <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
         <!-- Spring 接入方式支持 Java SDK 支持的所有配置项 -->
             <property name="properties" > <!--生产者配置信息-->
                 <props>
                     <prop key="ProducerId">PID_DEMO</prop> <!--请替换 XXX-->
                     <prop key="AccessKey">XXX</prop>
                     <prop key="SecretKey">XXX</prop>
                 </props>
             </property>
         </bean>
    
     </beans>
    
  2. 通过已经与 Spring 集成好的生产者生产消息。

     package demo;
    
     import com.aliyun.openservices.ons.api.Message;
     import com.aliyun.openservices.ons.api.Producer;
     import com.aliyun.openservices.ons.api.SendResult;
     import com.aliyun.openservices.ons.api.exception.ONSClientException;
     import org.springframework.context.ApplicationContext;
     import org.springframework.context.support.ClassPathXmlApplicationContext;
    
     public class ProduceWithSpring {
         public static void main(String[] args) {
             /**
              * 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中.
              */
             ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
    
             Producer producer = (Producer) context.getBean("producer");
             //循环发送消息
             for (int i = 0; i < 100; i++) {
                 Message msg = new Message( //
                         // Message 所属的 Topic
                         "TopicTestMQ",
                         // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
                         "TagA",
                         // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预
                         // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                         "Hello MQ".getBytes());
                 // 设置代表消息的业务关键属性,请尽可能全局唯一
                 // 以方便您在无法正常收到消息情况下,可通过 MQ 控制台查询消息并补发
                 // 注意:不设置也不会影响消息正常收发
                 msg.setKey("ORDERID_100");
                 // 发送消息,只要不抛异常就是成功
                 try {
                     SendResult sendResult = producer.send(msg);
                     assert sendResult != null;
                     System.out.println("send success: " + sendResult.getMessageId());
                 }catch (ONSClientException e) {
                     System.out.println("发送失败");
                 }
    
             }
         }
     }
    

事务消息生产者与 Spring 集成

有关事务消息的概念,请参见收发事务消息

  1. 首先需要实现一个 LocalTransactionChecker,如下所示。一个消息生产者只能有一个 LocalTransactionChecker。

     package demo;
    
     import com.aliyun.openservices.ons.api.Message;
     import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
     import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    
     public class DemoLocalTransactionChecker implements LocalTransactionChecker {
         public TransactionStatus check(Message msg) {
             System.out.println("开始回查本地事务状态");
             return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的 TransactionStatus
         }
     }
    
  2. 其次,在 transactionProducer.xml 中定义事务消息生产者 Bean 等信息。

     <?xml version="1.0" encoding="UTF-8"?>
     <beans xmlns="http://www.springframework.org/schema/beans"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
         <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
    
         <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
             <property name="properties" > <!--事务消息生产者配置信息-->
                 <props>
                     <prop key="ProducerId">PID_DEMO</prop> <!--请替换 XXX-->
                     <prop key="AccessKey">AKDEMO</prop>
                     <prop key="SecretKey">SKDEMO</prop>
                 </props>
             </property>
             <property name="localTransactionChecker" ref="localTransactionChecker"></property>
         </bean>
    
     </beans>
    
  3. 通过已经与 Spring 集成好的生产者生产事务消息。

     package demo;
    
     import com.aliyun.openservices.ons.api.Message;
     import com.aliyun.openservices.ons.api.SendResult;
     import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
     import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
     import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
     import org.springframework.context.ApplicationContext;
     import org.springframework.context.support.ClassPathXmlApplicationContext;
    
     public class ProduceTransMsgWithSpring {
    
         public static void main(String[] args) {
             /**
              * 事务消息生产者 Bean 配置在 transactionProducer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中.
              * 请结合例子"发送事务消息"
              */
             ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
    
             TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
    
             Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
    
             SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
                 @Override
                 public TransactionStatus execute(Message msg, Object arg) {
                     System.out.println("执行本地事务");
                     return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的 TransactionStatus
                 }
             }, null);
         }
     }
    

消费者与 Spring 集成

  1. 创建 MessageListener,如下所示。

     package demo;
    
     import com.aliyun.openservices.ons.api.Action;
     import com.aliyun.openservices.ons.api.ConsumeContext;
     import com.aliyun.openservices.ons.api.Message;
     import com.aliyun.openservices.ons.api.MessageListener;
    
     public class DemoMessageListener implements MessageListener {
    
         public Action consume(Message message, ConsumeContext context) {
    
             System.out.println("Receive: " + message.getMsgID());
             try {
                 //do something..
                 return Action.CommitMessage;
             }catch (Exception e) {
                 //消费失败
                 return Action.ReconsumeLater;
             }
         }
     }
    
  2. 在 consumer.xml 中定义消费者 Bean 等信息。

     <?xml version="1.0" encoding="UTF-8"?>
     <beans xmlns="http://www.springframework.org/schema/beans"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
         <bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener 配置-->
     <!-- 多 CID 订阅同一个 Topic,可以创建多个 ConsumerBean-->
         <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
             <property name="properties" > <!--消费者配置信息-->
                 <props>
                     <prop key="ConsumerId">CID_DEMO</prop> <!--请替换 XXX-->
                     <prop key="AccessKey">AKDEMO</prop>
                     <prop key="SecretKey">SKDEMO</prop>
                     <!--将消费者线程数固定为50个
                     <prop key="ConsumeThreadNums">50</prop>
                     -->
                 </props>
             </property>
             <property name="subscriptionTable">
                 <map>
                     <entry value-ref="msgListener">
                         <key>
                             <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                                 <property name="topic" value="TopicTestMQ"/>
                                 <property name="expression" value="*"/><!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成*。 *仅代表订阅所有 Tag,不支持通配-->
                             </bean>
                         </key>
                     </entry>
                     <!--更多的订阅添加 entry 节点即可,如下所示-->
                     <entry value-ref="msgListener">
                         <key>
                             <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                                 <property name="topic" value="TopicTestMQ-Other"/> <!--订阅另外一个 Topic -->
                                 <property name="expression" value="taga||tagb"/> <!-- 订阅多个 Tag -->
                             </bean>
                         </key>
                     </entry>
                 </map>
             </property>
         </bean>
    
     </beans>
    
  3. 运行已经与 Spring 集成好的消费者,如下所示。

     package demo;
    
     import org.springframework.context.ApplicationContext;
     import org.springframework.context.support.ClassPathXmlApplicationContext;
    
     public class ConsumeWithSpring {
         public static void main(String[] args) {
    
             /**
              * 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
              */
             ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
             System.out.println("Consumer Started");
         }
     }
    
本文导读目录