本文介绍如何在云消息队列 Kafka 版控制台创建流入任务,将指定云消息队列 Kafka 版实例中的数据快速导入至另外的云消息队列 Kafka 版实例中。
前提条件
您已购买云消息队列 Kafka 版实例,且实例处于服务中状态。具体步骤,请参见步骤二:购买和部署实例。
创建消息流入任务
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择 。
在任务列表页面,单击创建任务。
在创建任务面板,设置任务名称和描述,配置以下参数,单击保存。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 Kafka 版,设置以下参数,然后单击下一步。
参数
说明
示例
参数
说明
示例
地域
选择云消息队列 Kafka 版源实例所在的地域。
华北2(北京)
kafka 实例
选择生产云消息队列 Kafka 版消息的源实例。
MQ_INST_115964845466****_ByBeUp3p
Topic
选择生产云消息队列 Kafka 版消息的Topic。
topic
Group ID
选择源实例的消费组名称。请使用独立的消费组来创建事件源,不要和已有的业务混用消费组,以免影响已有的消息收发。
GID_http_1
消费位点
选择开始消费消息的位点。
最新位点
网络配置
选择路由消息的网络类型。
默认网络
专有网络VPC
选择VPC ID。当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
数据格式
数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。
Json(Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)
Text(默认文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)
Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)
Json
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见消息过滤。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗。
在Sink(目标)配置向导,选择服务类型为消息队列 Kafka 版,配置以下参数。
参数
说明
示例
参数
说明
示例
实例ID
选择已创建的云消息队列 Kafka 版实例。
test
Topic
选择已创建实例中的Topic。
test
确认模式(ack)
选择云消息队列 Kafka 版接收到数据后给客户端发出的确认信号。
None
LeaderOnly
All
None
消息体(Value)
事件总线EventBridge通过JSONPath提取消息中的数据,将指定的消息内容路由到目标。
完整数据
数据提取
固定值
模板
数据提取
$.data.value
消息键值(Key)
事件总线EventBridge通过JSONPath提取消息中的数据,将指定的消息内容路由到目标。
空
数据提取
固定值
模板
数据提取
$.data.key
任务属性
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用。
在提示对话框,阅读提示信息,然后单击确认。
启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。
其他操作
在任务列表页面,找到目标任务,在其右侧操作列,执行其他操作。
查看任务详情:单击详情,在任务页面,查看任务的基础信息、任务属性及监控指标。
编辑任务配置:单击编辑,在编辑任务面板,修改任务详情及属性。
启停任务:单击启用或者停用,然后在提示对话框,单击确认。
删除任务:单击删除,然后在提示对话框,单击确认。
- 本页导读 (1)
- 前提条件
- 创建消息流入任务
- 其他操作