全部产品
云市场

快速入门

更新时间:2019-09-19 13:36:44

使用消息队列产品实现应用系统间发布和订阅异步消息的步骤为:

  1. 配置 SOFABoot 工程
  2. 本地开发
    1. 消息发布者
    2. 消息消费者
  3. 控制台资源管理
    1. 确定消息类型
    2. 配置消息主题
    3. 配置消费组
    4. 添加订阅关系
  4. 发布应用

前置条件

消息队列工程的开发基于 SOFABoot 框架。您必须已经创建一个 SOFABoot 工程,并完成 SOFA 中间件 Maven 依赖的必要配置。详情参见:

本地开发

点击此链接下载示例工程。 其中,消息队列项目示例代码位于 middleware-v2/mqtutorial-all-in-one 文件夹下。

无论您将应用作为消息发布者还是消费者,都必须在工程中引入消息队列 Maven 依赖。在异步消息模块的 pom.xml 文件中添加如下依赖:

  1. <dependency>
  2. <groupId>com.alipay.sofa</groupId>
  3. <artifactId>mq-enterprise-sofa-boot-starter</artifactId>
  4. </dependency>

示例工程代码默认使用云上部署配置。如果您需要进行本地调试,在 application.properties 文件中,您需要修改以下参数信息:

  • 修改 run.mode 参数为 DEV 模式
    1. # 文件路径: middleware-v2/mqtutorial-all-in-one/app/web/src/main/resources/config/application.properties
    2. run.mode=DEV
  • 修改中间件全局配置项 com.alipay.instanceidcom.antcloud.antvip.endpoint。其参数值均可在 脚手架 > Step 2 页面中获取,如下图所示。环境参数配置示例如下:
    1. # env
    2. com.alipay.instanceid=000000001
    3. com.antcloud.antvip.endpoint=100.100.100.10

发布者 (Publisher)

发布者配置

发送消息的应用系统需要在 Spring 应用上下文文件中配置 sofa:publisher。应用上下文文件的默认路径如下所示:

  1. src/main/resources/META-INF/xxx/xxx-xxx.xml

发布者配置示例如下:

  1. <sofa:publisher id="mqPublisher" group="P_appname_service">
  2. <sofa:channels>
  3. <sofa:channel value="TP_DEFAULT"/>
  4. </sofa:channels>
  5. <sofa:binding.msg_broker/>
  6. </sofa:publisher>
  7. <bean id="mqService" class="com.antcloud.tutorial.mq.endpoint.service.MqService">
  8. <property name="mqPublisher" ref="mqPublisher"/>
  9. </bean>
  • id 是 Spring Bean 的单例服务标识,可以被依赖注入到其它 Spring Bean 属性值中。
  • group 命名格式为 P\_应用名\_服务名,是发布者唯一标识,不允许存在两个 sofa:publisher 配置相同的 group 属性值。
  • sofa:channel 元素的 value 值是此发布者发送的消息类型 TOPIC 值,如果发送消息类型涉及多个 TOPIC,必须配置多个 sofa:channel 元素。

发布者服务实现

消息发布者服务实现包含三个步骤:

  1. 创建消息对象(UniformEvent)。
  2. 设置消息对象属性值。
  3. 发送消息。

消息对象由 UniformEventBuilder 负责创建,必须指定 TOPIC 和 EVENTCODE 两个参数。消息负载封装在业务对象中,并设置为 uniformEvent 的 payload 属性值。消息发送通过 uniformEventPublisher 对象的 publisherUniformEvent 方法完成,此方法可能抛出运行时异常,如果应用程序预期捕获异常并处理,必须设置 uniformEvent 对象的属性值 throwExceptionOnFailed 为 true,否则运行时异常不会被抛出,只会记录错误日志。

说明UniformEvent.throwExceptionOnFailed 字段的默认值是 false。

消息发布者实现代码示例如下所示:

  1. public class MqService {
  2. private static final Logger logger = LoggerFactory.getLogger(MqService.class);
  3. private final static String TOPIC = "TP_DEFAULT";
  4. private final static String EVENTCODE = "EC_DEFAULT";
  5. private UniformEventPublisher mqPublisher;
  6. private UniformEventBuilder uniformEventBuilder = new DefaultUniformEventBuilder();
  7. public boolean publish() {
  8. if (logger.isInfoEnabled()) {
  9. logger.info("Publish a message.");
  10. }
  11. /* Create a message instance. */
  12. final UniformEvent message = uniformEventBuilder.buildUniformEvent(TOPIC, EVENTCODE);
  13. /* Set the business object as an event payload. */
  14. message.setEventPayload(buildDefaultAccount());
  15. /* Mark that a runtime exception must be thrown when publishing failure. */
  16. message.setThrowExceptionOnFailed(true);
  17. boolean publishSuccess = false;
  18. try {
  19. /* Do publish action. */
  20. mqPublisher.publishUniformEvent(message);
  21. publishSuccess = true;
  22. logger.info("Public a message, success. TOPIC [{}] EVENTCODE [{}] id [{}] payload [{}]", new Object[] {
  23. message.getTopic(), message.getEventCode(), message.getId(), message.getEventPayload() });
  24. } catch (Exception e) {
  25. logger.error("Public a message, failure. TOPIC [{}] EVENTCODE [{}] error [{}]",
  26. new Object[] { message.getTopic(), message.getEventCode(), e.getMessage() }, e);
  27. }
  28. return publishSuccess;
  29. }
  30. private Account buildDefaultAccount() {
  31. Account account = new Account();
  32. account.setId(UUID.randomUUID().toString());
  33. account.setAmount(new Random().nextDouble());
  34. account.setGmtCreate(new Date());
  35. return account;
  36. }
  37. public void setMqPublisher(UniformEventPublisher mqPublisher) {
  38. this.mqPublisher = mqPublisher;
  39. }
  40. }

消费者 (Consumer)

消费者配置

说明消息消费者 也可叫做 消息订阅者(Subscriber)

订阅消息的应用系统需要在 Spring 应用上下文文件中配置 sofa:consumer。 应用上下文文件的默认路径如下所示:

  1. src/main/resources/META-INF/xxx/xxx-xxx.xml

消息消费者配置示例:

  1. <!-- Declare a consumer bean with id "mqConsumer" -->
  2. <sofa:consumer id="mqConsumer" group="S_appname_service">
  3. <sofa:listener ref="mqMessageListener"/>
  4. <sofa:channels>
  5. <sofa:channel value="TP_DEFAULT">
  6. <sofa:event eventType="direct" eventCode="EC_DEFAULT" persistence="true"/>
  7. </sofa:channel>
  8. </sofa:channels>
  9. <sofa:binding.msg_broker/>
  10. </sofa:consumer>
  11. <!-- mq message listener -->
  12. <bean id="mqMessageListener" class="com.antcloud.tutorial.mq.endpoint.service.MqMessageListener"/>
  • id 是 Spring Bean 的单例服务标识,
  • group 命名格式为 S\_应用名\_服务名,是消费者唯一标识,不允许存在两个 sofa:consumer 配置相同的 group 属性值。
  • sofa:listener 元素的 ref 属性值设置为消息接监听器单例服务标识。
  • sofa:channel 元素的 value 值是此消费者订阅的消息类型 TOPIC 值,如果订阅的消息类型涉及多个 TOPIC,必须配置多个 sofa:channel 元素。
  • sofa:event 元素配置具体的消息订阅信息,eventType 属性值默认设置为 direct,eventCode 属性值设置为消息类型 eventcode 值,persistence 属性值设置为持久订阅(true)或者非持久订阅(false)。

消息接收监听器实现

订阅消息的应用系统必须实现 com.alipay.common.event.UniformEventMessageListener 接口并配置在 sofa:listener 元素的 ref 属性值中。当消息被消息消费者接收到时,com.alipay.common.event.UniformEventMessageListene.onUniformEvent 会被调用,应用系统通过实现此方法执行消息消费逻辑。消息接收监听器实现如下所示:

  1. public class MqMessageListener implements UniformEventMessageListener {
  2. private static final Logger logger = LoggerFactory.getLogger(MqMessageListener.class);
  3. @Override
  4. public void onUniformEvent(UniformEvent message, UniformEventContext context) throws Exception {
  5. /* get TOPIC, EVENTCODE and payload from the message instance */
  6. final String topic = message.getTopic();
  7. final String eventcode = message.getEventCode();
  8. final String id = message.getId();
  9. final Object businessObject = message.getEventPayload();
  10. logger.info("Receive a message, TOPIC [{}] EVENTCODE [{}] id [{}] payload [{}]", new Object[] { topic,
  11. eventcode, id, businessObject });
  12. try {
  13. boolean processSuccess = processMessage(businessObject);
  14. if (!processSuccess) {
  15. /* Process the message failure, set cause and rollback, the message is re-delivered later. */
  16. context.setContextDesc("process error cause");
  17. context.setRollbackOnly();
  18. }
  19. } catch (Exception e) {
  20. logger.error("Process a message, failure. TOPIC [{}] EVENTCODE [{}] id [{}] error {}", new Object[] {
  21. topic, eventcode, id, e.getMessage() }, e);
  22. /* When any exception is thrown, the message is re-delivered later. */
  23. throw e;
  24. }
  25. }
  26. /* Process the business logic */
  27. private boolean processMessage(Object businessObject) {
  28. return true;
  29. }
  30. }

消费者应用系统接收到消息后可能存在以下三种处理策略:

  • 消息消费正常。
  • 消息无法消费,主动回滚并设置回滚原因。消息会被重新投递。
  • 消息消费异常,抛出未捕获异常。消息会被重新投递。

控制台资源管理

消息主题、消息消费组和消息订阅关系必须在 消息队列 产品控制台中配置元数据后才能正常使用。控制台中配置的元数据信息必须和本地代码中一致。

确定消息类型

异步消息通信涉及两个核心角色:消息发布者消息消费者。同一个应用系统可能兼具消息发布者和消息消费者两个角色。发送消息的应用系统需要配置发布者,接收消息的应用系统配置消费者。两者都必须遵循事先约定好的消息类型。

一种消息类型由 TOPIC 和 EVENTCODE 唯一标识。控制台配置时,您需要确定这两个参数,保证与本地代码中的消息发布者(Publisher)与消息消费者(Consumer)配置一致。

参数 命名规则 示例
TOPIC 以 “TP_” 开头 TP_DEFAULT
EVENTCODE 以 “EC_” 开头 EC_DEFAULT

配置消息主题

  1. 在消息队列控制台页面,选择 消息主题管理 进入消息主题管理页面。
  2. 点击 创建主题信息 按钮,添加一个新的消息主题。
    • 消息主题:必填,字母、数字、下划线、中划线,以 “TP_”开头,6~32 个字符。
    • 描述:可选,根据业务需要自定义。
    • 消息分类码:代表一个消息大类(TOPIC)下的一个消息子类,可用于订阅过滤。以 “EC_”开头,包含字母、数字、下划线、中划线,6~32 个字符。
  3. 点击 提交

配置消费组

  1. 在消息队列控制台页面,选择 消费组管理 进入消息者管理页面。
  2. 点击 添加消费组 按钮,添加一个新的消费组。
    • 消费组:消费组是同一类消费者的集合名称和唯一标识,这类消费者通常消费一类消息,且消费代码保持一致。由字母、数字、下划线、中划线组成,以 “S_”开头,6~32 个字符。
    • 应用名称:应用的名称,对业务功能没有影响,仅页面标识作用。
    • 描述:根据业务需要自定义。
  3. 点击 提交

添加订阅关系

  1. 消息队列 > 消息主题管理 页面,找到刚创建的消息主题,点击对应 操作 列的 订阅关系 按钮。
  2. 进入该消息主题详情页,点击 添加订阅关系
  3. 在新窗口,添加一个消费者订阅该消息主题。
    • 消费组:选择刚刚创建的消费组。
    • 消费模式:目前仅支持 PUSH 模式。PUSH 模式是指当消息到达服务器后主动推送,投递给消费客户端。
    • 消费分类码:选择该消息主题下的一个消息分类码。
    • 持久类型:选择是否是持久型。
      • 持久型订阅:当订阅端系统未连接到消息代理组件时,消息代理组件负责保存消息,待订阅端系统连接后,完成投递。
      • 非持久型订阅:最多投递 4 次,同时当订阅端系统未连接到消息代理组件时,消息代理组件会直接丢弃需要被投递的消息。此类型订阅一般应用于时效性较强的消息。
    • 过滤表达式:过滤表达式根据表达式的计算匹配结果,在服务端根据表达式对消息进行过滤,减少无用消息的网络传输。
  4. 点击 提交,完成添加。

发布应用

SOFABoot 应用的发布指南参见文档 发布 SOFABoot 应用