文档

Spring 集成

更新时间:

本文介绍如何在 SpringBoot 框架下用 SOFAStack 消息队列收发消息。

背景信息

主要包括以下三部分内容:

  • 普通消息生产者和 Spring 集成

  • 事务消息生产者和 Spring 集成

  • 消息消费者和 Spring 集成

请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致。详情请参见 订阅关系一致

SpringBoot 框架下支持的配置参数和 TCP Java 一致。详情请参见 Java SDK 接口和参数说明

生产者与 Spring 集成

  1. 声明生产者。

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.Producer;
    
    @Configuration
    public class ProducerClient{
        @Autowired
        private MqConfig mqConfig;
    
        @Bean(initMethod ="start", destroyMethod ="shutdown")
        public Producer buildProducer(){
            Producer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
                    .createProducer(mqConfig.getMqProperties());
            return producer;
        }
    }
    
  2. 通过已经与 Spring 集成好的生产者生产消息。

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    
    import io.openmessaging.api.Message;
    import io.openmessaging.api.Producer;
    import io.openmessaging.api.SendResult;
    import io.openmessaging.api.exception.OMSRuntimeException;
    
    @Component
    public class SyncProducerTest{
        //普通消息的 Producer 已经注册到了 spring 容器中,后面需要使用时可以直接注入到其它类中
        @Autowired
        private Producer producer;
        @Autowired
        private MqConfig mqConfig;
        @Test
        public void testSend(){
                //循环发送消息
                for(int i =0; i <100; i++){
                    Message msg =new Message(
                    // Message 所属的 Topic
                    mqConfig.getTopic(),
                    // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
                    mqConfig.getTag(),
                    // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预
                    // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                    "Hello MQ".getBytes());
                    // 设置代表消息的业务关键属性,请尽可能全局唯一
                    // 以方便您在无法正常收到消息情况下,可通过 MQ 控制台查询消息并补发
                    // 注意:不设置也不会影响消息正常收发
                    msg.setKey("ORDERID_100");
                    // 发送消息,只要不抛异常就是成功
                    try{
                        SendResult sendResult = producer.send(msg);
                        assert sendResult !=null;
                        System.out.println(sendResult);
                    }catch(OMSRuntimeException e){
                        System.out.println("发送失败");
                        //出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
                    }
                }
        }
    }

事务消息生产者与 Spring 集成

事务消息的概念详情请参见收发事务消息。

  1. 首先需要实现一个 LocalTransactionChecker,如下所示。 一个消息生产者只能有一个 LocalTransactionChecker

    import org.springframework.stereotype.Component;
    import io.openmessaging.api.Message;
    import io.openmessaging.api.transaction.LocalTransactionChecker;
    import io.openmessaging.api.transaction.TransactionStatus;
    
    @Component
    public class DemoLocalTransactionChecker implements LocalTransactionChecker{
        @Override
        public TransactionStatus check(Message msg){
            System.out.println("开始回查本地事务状态");
            return TransactionStatus.CommitTransaction;//根据本地事务状态检查结果返回不同的 TransactionStatus
        }
    }
  2. 声明生产者。

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    import io.openmessaging.api.OMS;
    import io.openmessaging.api.transaction.TransactionProducer;
    
    @Configuration
    public class TransactionProducerClient{
        @Autowired
        private MqConfig  mqConfig;
        @Autowired
        private DemoLocalTransactionChecker localTransactionChecker;
        @Bean(initMethod ="start", destroyMethod ="shutdown")
        public TransactionProducer buildTransactionProducer(){
            TransactionProducer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
            .createTransactionProducer(mqConfig.getMqProperties(), localTransactionChecker);
            return producer;
        }
    }
  3. 通过已经与 Spring 集成好的生产者生产事务消息。

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;

import io.openmessaging.api.Message;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.transaction.LocalTransactionExecuter;
import io.openmessaging.api.transaction.TransactionProducer;
import io.openmessaging.api.transaction.TransactionStatus;

@RunWith(SpringRunner.class)
@SpringBootTest
public class TransactionProducerTest{

    //事务消息的 Producer 已经注册到了 spring 容器中,后面需要使用时可以直接注入到其它类中
    @Autowired
    private TransactionProducer transactionProducer;
    
    @Autowired
    private MqConfig mqConfig;

    @Test
    public void testSend(){
        Message msg =newMessage(mqConfig.getTopic(),"TagA","Hello MQ".getBytes());
        SendResult sendResult = transactionProducer.send(msg,newLocalTransactionExecuter(){
            @Override
            public TransactionStatus execute(Message msg,Object arg){
                System.out.println("执行本地事务");
                return TransactionStatus.CommitTransaction;//根据本地事务执行结果来返回不同的 TransactionStatus
            }
        },null);
        System.out.println(sendResult);
    }
}

消费者与 SpringBoot 集成

  1. 创建 MessageListener,如下所示。

    import org.springframework.stereotype.Component;
    import io.openmessaging.api.Action;
    import io.openmessaging.api.ConsumeContext;
    import io.openmessaging.api.Message;
    import io.openmessaging.api.MessageListener;
    
    @Component
    public class DemoMessageListener implements MessageListener{
        @Override
        public Action consume(Message message,ConsumeContext context){
            System.out.println("Receive: "+ message);
            try{
                //do something..
                return Action.CommitMessage;
            }catch(Exception e){
                //消费失败
                return Action.ReconsumeLater;
            }
        }
    }
  2. 声明消费者。

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
    import io.openmessaging.api.Consumer;
    import io.openmessaging.api.OMS;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class ConsumerClient{
        @Autowired
        private MqConfig mqConfig;
        @Autowired
        private DemoMessageListener messageListener;
        @Bean(initMethod ="start", destroyMethod ="shutdown")
        public Consumer buildConsumer(){
            Consumer consumer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
            .createConsumer(mqConfig.getMqProperties());
            consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), messageListener);
            return consumer;
        }
    }