更新时间:2020-01-13 13:55
本文介绍如何在 SpringBoot 框架下用 SOFAStack 消息队列收发消息。
主要包括以下三部分内容:
请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致。详情请参见订阅关系一致。
SpringBoot 框架下支持的配置参数和 TCP Java 一致。详情请参见 Java SDK 接口和参数说明。
申明生产者。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
import io.openmessaging.api.OMS;
import io.openmessaging.api.Producer;
@Configuration
public class ProducerClient {
@Autowired
private MqConfig mqConfig;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public Producer buildProducer() {
Producer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
.createProducer(mqConfig.getMqProperties());
return producer;
}
}
通过已经与 Spring 集成好的生产者生产消息。
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
import io.openmessaging.api.Message;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.exception.OMSRuntimeException;
@Component
public class SyncProducerTest {
//普通消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
@Autowired
private Producer producer;
@Autowired
private MqConfig mqConfig;
@Test
public void testSend() {
//循环发送消息
for (int i = 0; i < 100; i++) {
Message msg = new Message( //
// Message所属的Topic
mqConfig.getTopic(),
// Message Tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
mqConfig.getTag(),
// Message Body 可以是任何二进制形式的数据, MQ不做任何干预
// 需要Producer与Consumer协商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
// 设置代表消息的业务关键属性,请尽可能全局唯一
// 以方便您在无法正常收到消息情况下,可通过MQ 控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey("ORDERID_100");
// 发送消息,只要不抛异常就是成功
try {
SendResult sendResult = producer.send(msg);
assert sendResult != null;
System.out.println(sendResult);
} catch (OMSRuntimeException e) {
System.out.println("发送失败");
//出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
}
}
}
}
事务消息的概念详情请参见收发事务消息。
首先需要实现一个 LocalTransactionChecker,如下所示。 一个消息生产者只能有一个 LocalTransactionChecker。
import org.springframework.stereotype.Component;
import io.openmessaging.api.Message;
import io.openmessaging.api.transaction.LocalTransactionChecker;
import io.openmessaging.api.transaction.TransactionStatus;
@Component
public class DemoLocalTransactionChecker implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("开始回查本地事务状态");
return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus
}
}
申明事务生产者。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
import io.openmessaging.api.OMS;
import io.openmessaging.api.transaction.TransactionProducer;
@Configuration
public class TransactionProducerClient {
@Autowired
private MqConfig mqConfig;
@Autowired
private DemoLocalTransactionChecker localTransactionChecker;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public TransactionProducer buildTransactionProducer() {
TransactionProducer producer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
.createTransactionProducer(mqConfig.getMqProperties(), localTransactionChecker);
return producer;
}
}
通过已经与 Spring 集成好的生产者生产事务消息。
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
import io.openmessaging.api.Message;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.transaction.LocalTransactionExecuter;
import io.openmessaging.api.transaction.TransactionProducer;
import io.openmessaging.api.transaction.TransactionStatus;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TransactionProducerTest {
//事务消息的Producer 已经注册到了spring容器中,后面需要使用时可以直接注入到其它类中
@Autowired
private TransactionProducer transactionProducer;
@Autowired
private MqConfig mqConfig;
@Test
public void testSend() {
Message msg = new Message(mqConfig.getTopic(), "TagA", "Hello MQ".getBytes());
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("执行本地事务");
return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的TransactionStatus
}
}, null);
System.out.println(sendResult);
}
}
创建 MessageListener,如下所示。
import org.springframework.stereotype.Component;
import io.openmessaging.api.Action;
import io.openmessaging.api.ConsumeContext;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessageListener;
@Component
public class DemoMessageListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
try {
//do something..
return Action.CommitMessage;
} catch (Exception e) {
//消费失败
return Action.ReconsumeLater;
}
}
}
申明消费者。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import com.alipay.sofa.sofamq.example.springboot.config.MqConfig;
import io.openmessaging.api.Consumer;
import io.openmessaging.api.OMS;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerClient {
@Autowired
private MqConfig mqConfig;
@Autowired
private DemoMessageListener messageListener;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public Consumer buildConsumer() {
Consumer consumer = OMS.builder().driver("sofamq").build(mqConfig.getMqProperties())
.createConsumer(mqConfig.getMqProperties());
consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), messageListener);
return consumer;
}
}
在文档使用中是否遇到以下问题
更多建议
匿名提交