本文介绍如何在云消息队列 RocketMQ 版控制台创建消息流入任务,以实现将数据传输服务DTS的数据快速导入至云消息队列 RocketMQ 版。
前提条件
您已购买云消息队列 RocketMQ 版实例,且实例处于服务中状态。具体步骤,请参见创建资源。
在数据传输服务DTS控制台创建数据订阅任务且任务状态为正常。详细操作,请参见数据订阅操作指导。
在创建的数据订阅任务中新增消费组。
开服地域
在消息集成中创建消息流入任务时,支持将数据传输服务DTS作为消息流入任务源端的地域有华东1(杭州)、华东2(上海)、华北1(青岛)、华北2(北京)、华南1(深圳)、华南3(广州)、西南1(成都)和中国香港。
创建消息流入任务
登录云消息队列 RocketMQ 版控制台,在左侧导航栏选择 。
在顶部菜单栏选择地域,如华东1(杭州),然后在任务列表页面中,单击创建任务。
在创建任务面板,设置任务名称和描述,配置以下参数。
任务创建
在Source(源)配置向导,选择数据提供方为数据传输服务DTS,设置以下参数,然后单击下一步。
参数
说明
示例
地域
默认选择创建消息流入任务时所选的地域。
华东1(杭州)
数据订阅任务
选择您在数据传输服务DTS控制台上创建的数据订阅任务ID。
dts8jqe****
接入方式
默认为创建的数据订阅任务的接入方式且不可更改。
RDS
实例ID
默认为创建数据订阅任务时订阅的实例且不可更改。
rm-bp18mj3q2dzyb****
消费组
在前提条件中创建的用于消费订阅任务数据的消费组名称。
说明请确保该消费组没有在其他客户端的实例上运行,否则可能导致传入的消费位点失效。
test
账号
创建消费组时设置的账号。
test
密码
创建消费组时设置的密码。
******
消费位点
期望消费第一条数据的时间戳。消费位点必须在订阅实例的数据范围之内。
说明消费位点仅在新消费组第一次运行时生效,若后续任务重启,则会基于上次消费位点继续消费。
2022-06-21 00:00:00
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,定义数据模式过滤发送的请求。更多信息,请参见事件模式。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗。
在Sink(目标)配置向导,选择服务类型为消息队列 RocketMQ 版,配置以下参数。
参数
说明
示例
版本
选择云消息队列 RocketMQ 版实例的版本。
RocketMQ 4.x:服务端4.x版本。
RocketMQ 5.x:服务端5.x版本。
RocketMQ 5.x
实例ID
选择云消息队列 RocketMQ 版消息的目标实例。
rmq-cn-****
Topic
选择目标实例的Topic。
topic
消息体(body)
完整数据。
数据提取。
固定值。
模板。
数据提取
$.data.body
自定义属性(Properties)
空。
数据提取。
模板。
模板
变量:
{ "userProperties":"$.data.userProperties", "msgId":"$.data.systemProperties.UNIQ_KEY" }
模板:
{ "EB_SYS_EMBED_OBJECT":"${userProperties}", "UNIQ_KEY":"${msgId}" }
消息索引(Keys)
空。
数据提取。
固定值。
模板。
数据提取
$.data.systemProperties.KEYS
消息标签(Tags)
空。
数据提取。
固定值。
模板。
数据提取
$.data.systemProperties.TAGS
任务属性
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
完成上述配置后,单击保存。在任务列表页面,找到刚创建的任务,此时状态栏为启动中,当状态变为运行中时,任务创建成功。
其他操作
在任务列表页面,找到目标任务,在其右侧操作列,执行其他操作。
查看任务详情:单击详情,在任务详情页面,查看任务的基础信息、任务属性及监控指标。
编辑任务配置:单击编辑,在编辑任务面板,修改任务详情及属性。
启停任务:单击启用或者停用,然后在提示对话框,单击确认。
删除任务:单击删除,然后在提示对话框,单击确认。