全部产品
云市场

Spring 集成

更新时间:2019-09-14 00:16:50

本文介绍如何在 Spring 框架下用消息队列 MQ 收发消息。主要包括以下三部分内容:

  • 普通消息生产者和 Spring 集成

  • 事务消息生产者和 Spring 集成

  • 消息消费者和 Spring 集成

请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致。详情请参见订阅关系一致

Spring 框架下支持的配置参数和 TCP Java 一致。详情请参见 Java SDK 使用说明

客户端(ons-client)版本的详细请参见 Java SDK 版本说明

生产者与 Spring 集成

  1. 在 producer.xml 中定义生产者 Bean 等信息。

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    5. <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
    6. <!-- Spring 接入方式支持 Java SDK 支持的所有配置项 -->
    7. <property name="properties" > <!--生产者配置信息-->
    8. <props>
    9. <prop key="AccessKey">XXX</prop>
    10. <prop key="SecretKey">XXX</prop>
    11. <!-- ons-client 版本是 1.8.3.Final(版本说明请参见本页顶部 Java SDK 版本说明),需要配置(从实例详情页面复制 TCP 协议接入点)
    12. <prop key="NAMESRV_ADDR">XXX</prop>
    13. -->
    14. </props>
    15. </property>
    16. </bean>
    17. </beans>
  2. 通过已经与 Spring 集成好的生产者生产消息。

    1. package demo;
    2. import com.aliyun.openservices.ons.api.Message;
    3. import com.aliyun.openservices.ons.api.Producer;
    4. import com.aliyun.openservices.ons.api.SendResult;
    5. import com.aliyun.openservices.ons.api.exception.ONSClientException;
    6. import org.springframework.context.ApplicationContext;
    7. import org.springframework.context.support.ClassPathXmlApplicationContext;
    8. public class ProduceWithSpring {
    9. public static void main(String[] args) {
    10. /**
    11. * 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
    12. */
    13. ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
    14. Producer producer = (Producer) context.getBean("producer");
    15. //循环发送消息
    16. for (int i = 0; i < 100; i++) {
    17. Message msg = new Message( //
    18. // Message 所属的 Topic
    19. "TopicTestMQ",
    20. // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 MQ 的服务器过滤
    21. "TagA",
    22. // Message Body 可以是任何二进制形式的数据, 消息队列 MQ 不做任何干预
    23. // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
    24. "Hello MQ".getBytes());
    25. // 设置代表消息的业务关键属性,请尽可能全局唯一
    26. // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发
    27. // 注意:不设置也不会影响消息正常收发
    28. msg.setKey("ORDERID_100");
    29. // 发送消息,只要不抛异常就是成功
    30. try {
    31. SendResult sendResult = producer.send(msg);
    32. assert sendResult != null;
    33. System.out.println("send success: " + sendResult.getMessageId());
    34. }catch (ONSClientException e) {
    35. System.out.println("发送失败");
    36. }
    37. }
    38. }
    39. }

事务消息生产者与 Spring 集成

事务消息的概念详情请参见收发事务消息

  1. 首先需要实现一个 LocalTransactionChecker,如下所示。 一个消息生产者只能有一个 LocalTransactionChecker。

    1. package demo;
    2. import com.aliyun.openservices.ons.api.Message;
    3. import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
    4. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    5. public class DemoLocalTransactionChecker implements LocalTransactionChecker {
    6. public TransactionStatus check(Message msg) {
    7. System.out.println("开始回查本地事务状态");
    8. return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的 TransactionStatus
    9. }
    10. }
  2. 在 transactionProducer.xml 中定义事务消息生产者 Bean 等信息。

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    5. <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean>
    6. <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown">
    7. <property name="properties" > <!--事务消息生产者配置信息-->
    8. <props>
    9. <prop key="AccessKey">AKDEMO</prop>
    10. <prop key="SecretKey">SKDEMO</prop>
    11. <prop key="GROUP_ID">GID_DEMO</prop>
    12. <!-- ons-client 版本是 1.8.3.Final(版本说明请参见本页顶部 Java SDK 版本说明),需要配置(从实例详情页面复制 TCP 协议接入点)
    13. <prop key="NAMESRV_ADDR">XXX</prop>
    14. -->
    15. </props>
    16. </property>
    17. <property name="localTransactionChecker" ref="localTransactionChecker"></property>
    18. </bean>
    19. </beans>
  3. 通过已经与 Spring 集成好的生产者生产事务消息。

    1. package demo;
    2. import com.aliyun.openservices.ons.api.Message;
    3. import com.aliyun.openservices.ons.api.SendResult;
    4. import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
    5. import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
    6. import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
    7. import org.springframework.context.ApplicationContext;
    8. import org.springframework.context.support.ClassPathXmlApplicationContext;
    9. public class ProduceTransMsgWithSpring {
    10. public static void main(String[] args) {
    11. /**
    12. * 事务消息生产者 Bean 配置在 transactionProducer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
    13. * 请结合例子"发送事务消息"
    14. */
    15. ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml");
    16. TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer");
    17. Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes());
    18. SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
    19. @Override
    20. public TransactionStatus execute(Message msg, Object arg) {
    21. System.out.println("执行本地事务");
    22. return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的 TransactionStatus
    23. }
    24. }, null);
    25. }
    26. }

消费者与 Spring 集成

  1. 创建 MessageListener,如下所示。

    1. package demo;
    2. import com.aliyun.openservices.ons.api.Action;
    3. import com.aliyun.openservices.ons.api.ConsumeContext;
    4. import com.aliyun.openservices.ons.api.Message;
    5. import com.aliyun.openservices.ons.api.MessageListener;
    6. public class DemoMessageListener implements MessageListener {
    7. public Action consume(Message message, ConsumeContext context) {
    8. System.out.println("Receive: " + message.getMsgID());
    9. try {
    10. //do something..
    11. return Action.CommitMessage;
    12. }catch (Exception e) {
    13. //消费失败
    14. return Action.ReconsumeLater;
    15. }
    16. }
    17. }
  2. 在 consumer.xml 中定义消费者 Bean 等信息。

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <beans xmlns="http://www.springframework.org/schema/beans"
    3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    5. <bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener 配置-->
    6. <!-- Group ID 订阅同一个 Topic,可以创建多个 ConsumerBean-->
    7. <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
    8. <property name="properties" > <!--消费者配置信息-->
    9. <props>
    10. <prop key="GROUP_ID">GID_DEMO</prop> <!--请替换 XXX-->
    11. <prop key="AccessKey">AKDEMO</prop>
    12. <prop key="SecretKey">SKDEMO</prop>
    13. <!-- ons-client 版本在 1.8.3.Final(版本说明请参见本页顶部 Java SDK 版本说明),需要配置(从实例详情页面复制 TCP 协议接入点)
    14. <prop key="NAMESRV_ADDR">XXX</prop>
    15. -->
    16. <!--将消费者线程数固定为 50 个
    17. <prop key="ConsumeThreadNums">50</prop>
    18. -->
    19. </props>
    20. </property>
    21. <property name="subscriptionTable">
    22. <map>
    23. <entry value-ref="msgListener">
    24. <key>
    25. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
    26. <property name="topic" value="TopicTestMQ"/>
    27. <property name="expression" value="*"/><!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成 *。 * 仅代表订阅所有 Tag,不支持通配-->
    28. </bean>
    29. </key>
    30. </entry>
    31. <!--更多的订阅添加 entry 节点即可,如下所示-->
    32. <entry value-ref="msgListener">
    33. <key>
    34. <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
    35. <property name="topic" value="TopicTestMQ-Other"/> <!--订阅另外一个 Topic -->
    36. <property name="expression" value="taga||tagb"/> <!-- 订阅多个 Tag -->
    37. </bean>
    38. </key>
    39. </entry>
    40. </map>
    41. </property>
    42. </bean>
    43. </beans>
  3. 运行已经与 Spring 集成好的消费者,如下所示。

    1. package demo;
    2. import org.springframework.context.ApplicationContext;
    3. import org.springframework.context.support.ClassPathXmlApplicationContext;
    4. public class ConsumeWithSpring {
    5. public static void main(String[] args) {
    6. /**
    7. * 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中
    8. */
    9. ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
    10. System.out.println("Consumer Started");
    11. }
    12. }