本文通过一个模拟场景介绍RocketMQ连接器的简单使用。场景为消费者下发订餐信息(消息队列RocketMQ的Topic发送消息),订餐信息持久化存储到指定数据库,然后通知商家(消息队列RocketMQ的Topic)接单。

前提条件

您需要提前完成以下准备工作:

背景信息

本文介绍的是实现订阅订餐信息,订餐信息持久化处理,然后发布订单信息到商家以提醒商家接单的配置流程。

RocketMQ的主题TopicA发送信息模拟消费者下发订餐信息,然后订餐信息存储到数据库的orders表,最后再将信息发送到商家(RocketMQ的主题TopicB)。

订餐信息内容假设为消费者ID(customer_id)、商家ID(restaurant_id)和食品(food)的JSON语句,示例如下:
{
  "customer_id": 200,
  "restaurant_id": 800,
  "food": "apple"
}

orders表中也设置了customer_id、restaurant_id和food字段,以便和订餐信息一一对应。

创建的集成主要完成以下动作:
  1. 订阅订餐信息(即TopicA发送的消息)。
  2. 订餐信息持久化存储(即保存消息到数据库表)。
  3. 发送订餐信息到商家(即发送到TopicB),提醒商家接单。

视频教程

创建连接

本示例中会用到RocketMQ和Database连接,所以需要借助连接器创建对应的连接。

创建RocketMQ和Database连接。
创建连接的具体操作,请参见创建连接

创建空白集成

  1. 登录应用集成控制台
  2. 在顶部菜单栏选择地域
  3. 在左侧导航栏选择集成 > 集成列表,然后在当前工作空间下拉列表中选择目标工作空间。
  4. 集成列表页面单击新建集成
  5. 新建集成面板中创建方式选择为空白流,选择目标环境,输入名称,然后单击创建
  6. 集成创建后进入集成设计页面,在页面右上角单击保存

创建集成流

  1. 集成设计页面左上角单击 图标,在列表中单击Flow,创建集成流。
    也可以在页面中单击点击创建,在列表中单击Flow,创建集成流。
  2. 选择触发器,订阅订餐信息。
    1. 创建新集成流对话框输入集成流名称,并选择之前创建的RocketMQ连接作为触发器,然后单击创建触发器-RocketMQ
    2. 选择操作对话框中订阅消息右侧,单击选择选择operation
    3. 步骤配置对话框中设置参数,然后单击确定
      步骤配置
      参数 描述
      Topic 订阅消息的目标主题,本文以TopicA为例。
      consumerGroup 消费者的Group ID,本文以GID_GroupB为例。
      subExpression 消息过滤表达式,满足表达式的消息才会被订阅。
      说明

      仅支持或运算,如tag1 || tag2 || tag3,如果设置为空和星号(*),则表示全部订阅。

      本示例中如果TopicA发送的消息不带TagA,则不会触发集成流。

      messageModel 消息模型,保持默认值。
    4. 设置outputDataShape对话框选择schema列表中选择Json Schema,然后在设置outputDataShape对话框输入Json Schema格式的订餐信息相关参数,单击创建
       设置outputDataShape-Json Schema
      参数 描述
      选择schema 订餐信息的转换格式,本示例中以Json Schema格式为例。
      schema Json Schema格式订餐信息示例。
      名称 自定义设置名称。
      描述 订餐信息描述。
      本示例中的Json Schema格式订餐信息详情如下:
      {
        "$schema": "http://json-schema.org/draft-04/schema#",
        "type": "object",
        "properties": {
          "customer_id": {
            "type": "integer"
          },
          "restaurant_id": {
            "type": "integer"
          },
          "food": {
            "type": "string"
          }
        },
        "required": [
          "customer_id",
          "restaurant_id",
          "food"
        ]
      }
    创建完成后,集成流中即包含订阅订餐信息的触发器。RocketMQ触发器-订阅消息
  3. 在集成流中添加条件转换器,实现将订餐信息的Body通过Header透传。
    集成流每个step运行之后,订餐信息的Body都会变化,此处通过Header透传解决此问题。
    1. 在集成流中订阅信息右侧单击 图标。
    2. 选择组件类型对话框单击逻辑步骤,然后单击转换器
    3. groovy 脚本对话框输入条件转换语句,单击确定
      条件转换语句本示例中的条件转换语句为msg.setHeader("orders", msg.getBody()),即将订阅的订餐信息的Body通过Header透传。
    4. 设置对话框选择schema列表中选择任意类型,然后单击创建
    创建完成后,集成流中即包含触发器和条件转换器。触发器-Transform
  4. 在集成流中添加Database连接,实现订餐信息的持久化存储。
    1. 在集成流中转换器右侧单击 图标。
    2. 选择组件类型对话框单击连接,然后单击之前新建的Database连接。
    3. 选择操作对话框中Invoke SQL右侧,单击选择
      选择操作-Invoke SQL
    4. 步骤配置对话框设置参数,然后单击确定
      步骤配置-SQL

      本示例输入的SQL语句为insert into orders(customer_id, restaurant_id, food) values(:#customer_id, :#restaurant_id, :#food),即在数据库表orders中写入订餐信息。

    创建完成后,集成流中即包含触发器、条件转换器和Database连接。触发器-Transform-Database
  5. 在集成流中添加数据映射器,实现将订餐信息字段赋值给数据库表的对应字段。
    1. 在集成流中转换器右侧单击 图标。
    2. 选择组件类型对话框单击逻辑步骤,然后单击数据映射器
    3. Source > 1-apple-testTarget > 2-SQL Parameter之间单击相同字段进行映射,然后单击确认
      字段映射
    4. 设置对话框选择schema列表中选择任意类型,然后单击创建
    创建完成后,集成流中即包含触发器、条件转换器、数据映射器和Database连接。集成流
  6. 在集成流中添加条件转换器,实现将订餐信息的Header重新赋值给Body。
    1. 在集成流中Invoke SQL右侧单击 图标。
    2. 选择组件类型对话框单击逻辑步骤,然后单击转换器
    3. groovy 脚本对话框输入条件转换语句,单击确定
      条件转换语句本示例中的条件转换语句为payload = msg.getHeader("orders"),即将订餐信息的Header重新赋值给Body。
    4. 设置对话框选择schema列表中选择任意类型,然后单击创建
    创建完成后,集成流中即包含触发器、条件转换器、数据映射器、Database连接和条件转换器。集成流
  7. 在集成流中添加发送订餐信息到商家的步骤。
    1. 在集成流中转换器右侧单击 图标。
    2. 选择组件类型对话框单击连接,然后单击之前新建的RocketMQ连接。
    3. 选择操作对话框中发送消息右侧,单击选择
      选择操作-发送消息
    4. 步骤配置对话框设置参数,然后单击确定
      步骤配置-发送消息
      参数 描述
      Topic 发送消息到目标主题,本文以TopicB为例。
      发送组(ProducerGroup) 发送组,起标识作用,可不填。
      sendTag 发送消息的标签,用于对消息的再归类。
      sendKey 发送消息的业务标识。
      sendMsgTimeoutMillis 延时时长,单位毫秒,此处以3000为例。
    5. 设置inputDataShape对话框选择schema列表中选择任意类型,然后单击创建
  8. 集成设计页面右上角单击保存,保存集成流。
    实现订餐信息持久化存储和提醒商家接单的集成流创建完成。完整集成流
    注意 返回集成设计页面时,请及时在页面右上角单击保存,以免添加的步骤丢失。

部署集成

集成创建并保存后,需要对集成进行部署。详情请参见部署集成示例

结果验证

  1. 消费者发送信息。
    1. 登录消息队列RocketMQ版控制台
    2. 实例列表页面,找到目标实例,在其操作列,单击详情
    3. 在TopicA内发送消息。
      1. 在左侧导航栏单击Topic管理
      2. Topic管理页面选择TopicA,并单击右侧的发送消息
      3. 发送消息对话框设置消息参数,单击确定发送消息
        参数 描述
        Topic 已选定发送消息的Topic,不可编辑。
        Tag 发送消息携带的标签。
        Key 发送消息携带的业务标识。
        消息体 发送的消息主体,本示例为:
        {
          "customer_id": 200,
          "restaurant_id": 800,
          "food": "apple"
        }
  2. 查询集成执行记录。
    1. 登录应用集成控制台
    2. 在顶部菜单栏选择地域
    3. 在左侧导航栏选择集成 > 执行管理
    4. 执行管理页面通过工作空间/集成部署环境/日期运行时长筛选目标集群的执行记录。
      集成的执行记录状态为SUCCESS即代表集成正常运行。执行记录
    5. 在目标集成的执行记录右侧单击查看
      在执行日志对话框中可以查看集成流每个步骤的状态和相关输出。执行日志
  3. 在数据库表查询orders表内容是否新增一条订餐信息。
    数据库新增订餐信息
  4. 商家查询订餐消息。
    1. 登录消息队列RocketMQ版控制台
    2. 实例列表页面,找到目标实例,在其操作列,单击详情
    3. 在左侧导航栏单击消息查询
    4. 消息查询页面单击按Topic查询页签。
      如果您需要了解其他查询消息的方式,请参见消息查询
    5. 按Topic查询页签下的搜索框选择TopicB,并选择时间,然后单击搜索
      查询消息-TopicB
    6. 在查询消息结果列表单击目标记录的右上角的下载按钮,然后在本地打开下载的文件,查看详细订餐信息。
      详细订餐信息