全部产品
云市场

Spring 集成

更新时间:2020-01-13 13:55:14

本文介绍如何在 SpringBoot 框架下用 SOFAStack 消息队列收发消息。

背景信息

主要包括以下三部分内容:

  • 普通消息生产者和 Spring 集成
  • 事务消息生产者和 Spring 集成
  • 消息消费者和 Spring 集成

请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致。详情请参见订阅关系一致。

SpringBoot 框架下支持的配置参数和 TCP Java 一致。详情请参见 Java SDK 接口和参数说明

生产者与 Spring 集成

  1. 申明生产者。

    1. import org.springframework.beans.factory.annotation.Autowired;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    5. import io.openmessaging.api.OMS;
    6. import io.openmessaging.api.Producer;
    7. @Configuration
    8. public class ProducerClient {
    9. @Autowired
    10. private MqConfig mqConfig;
    11. @Bean(initMethod = "start", destroyMethod = "shutdown")
    12. public Producer buildProducer() {
    13. Producer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
    14. .createProducer(mqConfig.getMqProperties());
    15. return producer;
    16. }
    17. }
  2. 通过已经与 Spring 集成好的生产者生产消息。

    1. import org.junit.Test;
    2. import org.junit.runner.RunWith;
    3. import org.springframework.beans.factory.annotation.Autowired;
    4. import org.springframework.boot.test.context.SpringBootTest;
    5. import org.springframework.test.context.junit4.SpringRunner;
    6. import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    7. import io.openmessaging.api.Message;
    8. import io.openmessaging.api.Producer;
    9. import io.openmessaging.api.SendResult;
    10. import io.openmessaging.api.exception.OMSRuntimeException;
    11. @Component
    12. public class SyncProducerTest {
    13. //普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
    14. @Autowired
    15. private Producer producer;
    16. @Autowired
    17. private MqConfig mqConfig;
    18. @Test
    19. public void testSend() {
    20. //循环发送消息
    21. for (int i = 0; i < 100; i++) {
    22. Message msg = new Message( //
    23. // Message所属的Topic
    24. mqConfig.getTopic(),
    25. // Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
    26. mqConfig.getTag(),
    27. // Message Body 可以是任何二进制形式的数据, MQ不做任何干预
    28. // 需要Producer与Consumer协商好一致的序列化和反序列化方式
    29. "Hello MQ".getBytes());
    30. // 设置代表消息的业务关键属性,请尽可能全局唯一
    31. // 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
    32. // 注意:不设置也不会影响消息正常收发
    33. msg.setKey("ORDERID_100");
    34. // 发送消息,只要不抛异常就是成功
    35. try {
    36. SendResult sendResult = producer.send(msg);
    37. assert sendResult != null;
    38. System.out.println(sendResult);
    39. } catch (OMSRuntimeException e) {
    40. System.out.println("发送失败");
    41. //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
    42. }
    43. }
    44. }
    45. }

事务消息生产者与 Spring 集成

事务消息的概念详情请参见收发事务消息。

  1. 首先需要实现一个 LocalTransactionChecker,如下所示。 一个消息生产者只能有一个 LocalTransactionChecker。

    1. import org.springframework.stereotype.Component;
    2. import io.openmessaging.api.Message;
    3. import io.openmessaging.api.transaction.LocalTransactionChecker;
    4. import io.openmessaging.api.transaction.TransactionStatus;
    5. @Component
    6. public class DemoLocalTransactionChecker implements LocalTransactionChecker {
    7. @Override
    8. public TransactionStatus check(Message msg) {
    9. System.out.println("开始回查本地事务状态");
    10. return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus
    11. }
    12. }
  2. 申明事务生产者。

    1. import org.springframework.beans.factory.annotation.Autowired;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    5. import io.openmessaging.api.OMS;
    6. import io.openmessaging.api.transaction.TransactionProducer;
    7. @Configuration
    8. public class TransactionProducerClient {
    9. @Autowired
    10. private MqConfig mqConfig;
    11. @Autowired
    12. private DemoLocalTransactionChecker localTransactionChecker;
    13. @Bean(initMethod = "start", destroyMethod = "shutdown")
    14. public TransactionProducer buildTransactionProducer() {
    15. TransactionProducer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
    16. .createTransactionProducer(mqConfig.getMqProperties(), localTransactionChecker);
    17. return producer;
    18. }
    19. }
  3. 通过已经与 Spring 集成好的生产者生产事务消息。

  1. import org.junit.Test;
  2. import org.junit.runner.RunWith;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import org.springframework.test.context.junit4.SpringRunner;
  6. import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
  7. import io.openmessaging.api.Message;
  8. import io.openmessaging.api.SendResult;
  9. import io.openmessaging.api.transaction.LocalTransactionExecuter;
  10. import io.openmessaging.api.transaction.TransactionProducer;
  11. import io.openmessaging.api.transaction.TransactionStatus;
  12. @RunWith(SpringRunner.class)
  13. @SpringBootTest
  14. public class TransactionProducerTest {
  15. //事务消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
  16. @Autowired
  17. private TransactionProducer transactionProducer;
  18. @Autowired
  19. private MqConfig mqConfig;
  20. @Test
  21. public void testSend() {
  22. Message msg = new Message(mqConfig.getTopic(), "TagA", "Hello MQ".getBytes());
  23. SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
  24. @Override
  25. public TransactionStatus execute(Message msg, Object arg) {
  26. System.out.println("执行本地事务");
  27. return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的TransactionStatus
  28. }
  29. }, null);
  30. System.out.println(sendResult);
  31. }
  32. }

消费者与 SpringBoot 集成

  1. 创建 MessageListener,如下所示。

    1. import org.springframework.stereotype.Component;
    2. import io.openmessaging.api.Action;
    3. import io.openmessaging.api.ConsumeContext;
    4. import io.openmessaging.api.Message;
    5. import io.openmessaging.api.MessageListener;
    6. @Component
    7. public class DemoMessageListener implements MessageListener {
    8. @Override
    9. public Action consume(Message message, ConsumeContext context) {
    10. System.out.println("Receive: " + message);
    11. try {
    12. //do something..
    13. return Action.CommitMessage;
    14. } catch (Exception e) {
    15. //消费失败
    16. return Action.ReconsumeLater;
    17. }
    18. }
    19. }
  2. 申明消费者。

    1. import org.springframework.beans.factory.annotation.Autowired;
    2. import org.springframework.context.annotation.Bean;
    3. import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    4. import io.openmessaging.api.Consumer;
    5. import io.openmessaging.api.OMS;
    6. import org.springframework.context.annotation.Configuration;
    7. @Configuration
    8. public class ConsumerClient {
    9. @Autowired
    10. private MqConfig mqConfig;
    11. @Autowired
    12. private DemoMessageListener messageListener;
    13. @Bean(initMethod = "start", destroyMethod = "shutdown")
    14. public Consumer buildConsumer() {
    15. Consumer consumer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
    16. .createConsumer(mqConfig.getMqProperties());
    17. consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), messageListener);
    18. return consumer;
    19. }
    20. }