全部产品
云市场

订阅一条普通消息

更新时间:2019-09-18 12:45:48

本教程的学习目标如下:

  • 定义订阅者唯一标识 group
  • 掌握注册一个有效的消息订阅的方式。
  • 掌握配置消息订阅者的的方式。
  • 掌握查询消息接收日志的方式。

命名订阅者唯一标识

订阅者(也叫消费者)即订阅消息的应用系统,是消息中间件的一个关键角色。

与发布者相对应,订阅者也需要命名唯一标识称为“订阅者 group”,命名约定格式是 S\_appname\_service, 其中 “S_” 前缀代表 Subscriber。在本教程中,我们将其命名为 S_appname_service

申请订阅关系

消息订阅是消息队列产品的一个关键概念,包括订阅者唯一标识、消息类型、订阅关系类型和持久化类型四个元素。通常描述如下:

订阅者 C 订阅了消息类型 TOPIC_DEFAULTEVENTCODE_DEFAULT;订阅类型是 DIRECT;持久化类型为非持久型。

要订阅到预期的消息类型,订阅者必须申请有效的订阅关系,申请之前需要明确订阅关系的四个必要元素:

  1. 订阅者唯一标识 groupS_appname_service
  2. 需要订阅的目标消息类型 (TOPIC+EVENTCODE):TP_DEFAULT,EC_DEFAULT
  3. 消息订阅类型,默认为 DIRECT
  4. 持久化类型,本教程使用“非持久型”

查询已存在的消息订阅和添加新的消息订阅,参见 添加消费组订阅关系

配置消息订阅者

目录切换进入 mqtutorial-subscribe-message 目录,打开配置文件 mqtutorial-subscribe-message-endpoint.xml 查看订阅者配置,具体路径如下:

  1. mqtutorial-subscribe-message/app/src/main/resources/META-INF/mqtutorial-subscribe-message/mqtutorial-subscribe-message-endpoint.xml

订阅者配置的核心元素是 sofa:consumer,具体内容如下:

  1. <!-- Declare a consumer bean with id "mqConsumer" -->
  2. <sofa:consumer id="mqConsumer" group="S_appname_service">
  3. <sofa:listener ref="mqMessageListener"/>
  4. <sofa:channels>
  5. <sofa:channel value="TP_DEFAULT">
  6. <sofa:event eventType="direct" eventCode="EC_DEFAULT" persistence="true"/>
  7. </sofa:channel>
  8. </sofa:channels>
  9. <sofa:binding.msg_broker/>
  10. </sofa:consumer>
  11. <bean id="uniformEventMessageListener" class="com.alipay.cloud.UniformEventMessageListenerImpl"/>

以上配置中有四个关键元素,具体说明如下:

  • sofa:consumer 元素 group 属性值是订阅者 group,请修改为自定义的 group 值,即 S_appname_service
  • sofa:listener 元素配置了消息接收 Handler。
  • sofa:channel 元素的 value 属性值是需要订阅的消息类型 TOPIC 值。
  • sofa:event 元素具体声明了订阅关系的其它要素。

首先,打开源代码文件 com.antcloud.tutorial.mq.endpoint.service.MqMessageListener 查看具体的消息处理逻辑。示例代码中的消息处理逻辑很简单,只是在接收到消息后输出消息内容日志。需要注意的是,消息处理器必须实现 com.alipay.common.event.UniformEventMessageListener 接口。

然后,将 mqtutorial-subscribe-message 项目发布部署到 SOFAStack 平台上,具体发布部署方式,参见 发布应用