本文介绍如何在云消息队列 RocketMQ 版控制台创建消息流出任务,以实现将云消息队列 RocketMQ 版数据快速导入至云消息队列 Kafka 版。
前提条件
创建消息流出任务
登录云消息队列 RocketMQ 版控制台,在左侧导航栏选择 。
在顶部菜单栏选择地域,如华东1(杭州),然后在任务列表页面中,单击创建任务列表。
在创建任务面板,设置任务名称和描述,配置以下参数。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 RocketMQ 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
默认选择创建消息流出任务时选取的地域。
华东1(杭州)
版本
选择云消息队列 RocketMQ 版实例的版本。
RocketMQ 4.x:服务端4.x版本。
RocketMQ 5.x:服务端5.x版本。
RocketMQ 5.x
RocketMQ 实例
选择生产消息的源云消息队列 RocketMQ 版实例。
rmq-cn-****
Topic
选择生产消息的源Topic。
topic
Tag
云消息队列 RocketMQ 版中用于过滤消息的标签。
test_tag
Group ID
云消息队列 RocketMQ 版 中的消费组名称。
快速创建:推荐方案,自动创建以GID_EVENTBRIDGE_xxx 命名的 Group ID。
使用已有:请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。
快速创建
消费点位
最新位点:从最新位点开始消费。
最早位点:从最初位点开始消费。
指定时间戳:从指定时间开始消费。
最新位点
消费时间点
选择消费时间点。仅当消费点位配置为指定时间戳时需配置此参数。
2024-06-18 15:28:29
数据格式(Body)
数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。
Json(默认Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)
Text(文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)
Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)
Json
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发送给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,定义数据模式过滤发送的请求。更多信息,请参见消息过滤。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗。
在Sink(目标)配置向导,选择服务类型为消息队列 Kafka 版,配置以下参数。
参数
说明
示例
实例ID
选择已创建的云消息队列 Kafka 版实例。
test
Topic
选择已创建实例中的Topic。
test
确认模式(ack)
选择云消息队列 Kafka 版接收到数据后给客户端发出的确认信号。
None。
LeaderOnly。
All。
None
消息体(Value)
完整数据。
数据提取。
固定值。
模板。
数据提取
$.data.value
消息键值(Key)
空。
数据提取。
固定值。
模板。
数据提取
$.data.key
任务属性
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
完成上述配置后,单击保存。在任务列表页面,找到刚创建的任务,此时状态栏为启动中,当状态变为运行中时,任务创建成功。
其他操作
在任务列表页面,找到目标任务,在其右侧操作列,执行其他操作。
查看任务详情:单击详情,在任务详情页面,查看任务的基础信息、任务属性及监控指标。
编辑任务配置:单击编辑,在编辑任务面板,修改任务详情及属性。
启停任务:单击启用或者停用,然后在提示对话框,单击确认。
删除任务:单击删除,然后在提示对话框,单击确认。