本文介绍如何在云消息队列 Kafka 版控制台创建消息流入任务,将云消息队列 RocketMQ 版的数据快速导入至云消息队列 Kafka 版。
前提条件
您已购买Kafka实例,且实例处于服务中状态。具体步骤,请参见步骤二:购买和部署实例。
您已购买云消息队列 RocketMQ 版实例,且实例处于服务中状态。具体步骤,请参见步骤二:创建资源。
创建消息流入任务
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择 。
在任务列表页面,单击创建任务。
在创建任务面板,设置任务名称和描述,配置以下信息,然后单击保存。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 RocketMQ 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
选择消息队列RocketMQ版源实例所在的地域。
华东1(杭州)
版本
选择RocketMQ实例版本。
RocketMQ 4.x
RocketMQ 实例
选择生产消息队列RocketMQ版消息的源实例。
MQ_INST_115964845466****_ByBeUp3p
Topic
选择生产消息队列RocketMQ版消息的Topic。
topic
Tag
配置源实例中用于过滤消息的Tag。
test
Group ID
选择源实例中的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。
GID_http_1
消费位点
选择开始消费消息的位点。
最新位点
批量推送
批量推送可帮您批量聚合多个事件,当批量推送条数和批量推送间隔(单位:秒)两者条件达到其一时即会触发批量推送。
例如:您设置的推送条数为100 条,间隔时间为15 s,在10 s内消息条数已达到100条,那么该次推送则不会等15 s后再推送。
开启
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见消息过滤。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见数据清洗。
在Sink(目标)配置向导,选择服务类型为消息队列 Kafka 版,配置以下参数。
参数
说明
示例
实例ID
选择已创建的云消息队列 Kafka 版实例。
test
Topic
选择已创建实例中的Topic。
test
确认模式(ack)
选择云消息队列 Kafka 版接收到数据后给客户端发出的确认信号。
None
消息体(body)
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。
部分事件
$.data.value
消息键值(Key)
事件总线EventBridge通过JSONPath提取事件中的数据,将指定的事件内容路由到事件目标。
部分事件
$.data.key
任务属性
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用。
在提示对话框,阅读提示信息,然后单击确认。
启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。
其他操作
在任务列表页面,找到目标任务,在其右侧操作列,执行其他操作。
查看任务详情:单击详情,在任务页面,查看任务的基础信息、任务属性及监控指标。
编辑任务配置:单击编辑,在编辑任务面板,修改任务详情及属性。
启停任务:单击启用或者停用,然后在提示对话框,单击确认。
删除任务:单击删除,然后在提示对话框,单击确认。