本文介绍如何创建使用AnalyticDB Sink Connector,您可通过AnalyticDB Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至AnalyticDB的表中。
前提条件
详细步骤,请参见创建前提。
步骤一:创建目标服务资源
创建云原生数据仓库 AnalyticDB MySQL 版资源或云原生数据仓库AnalyticDB PostgreSQL版资源。
云原生数据仓库 AnalyticDB MySQL 版:在云原生数据仓库 AnalyticDB MySQL 版控制台创建集群、数据库账号,连接集群并创建数据库。更多信息,请参见创建集群、创建数据库账号、连接集群和创建数据库。
云原生数据仓库AnalyticDB PostgreSQL版:在云原生数据仓库AnalyticDB PostgreSQL版控制台创建实例、数据库账号和登录数据库。更多信息,请参见创建实例、创建和管理用户和客户端连接。
本文以AnalyticDB MySQL版为例,创建数据库名为adb_sink_database和数据表名为adb_sink_table的资源。
步骤二:创建AnalyticDB Sink Connector并启动
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择 。
在任务列表页面,单击创建任务。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 Kafka 版,设置以下参数,然后单击下一步。
参数
说明
示例
参数
说明
示例
地域
选择云消息队列 Kafka 版源实例所在的地域。
华北2(北京)
kafka 实例
选择生产云消息队列 Kafka 版消息的源实例。
alikafka_post-cn-jte3****
Topic
选择生产云消息队列 Kafka 版消息的Topic。
demo-topic
Group ID
选择源实例的消费组名称。
快速创建:推荐方案,自动创建以
GID_EVENTBRIDGE_xxx
命名的 Group ID。使用已有:请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发
快速创建
消费位点
选择开始消费消息的位点。
最新点位
最早点位
最新位点
网络配置
选择路由消息的网络类型。
基础网络
自建公网
基础网络
专有网络VPC
选择VPC ID。当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
数据格式
数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。
Json(Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)
Text(默认文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)
Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)
Json
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见事件模式。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗。
在Sink(目标)配置向导,选择服务类型为云原生数据仓库 AnalyticDB,配置以下参数,单击保存。
参数
说明
示例
参数
说明
示例
实例类型
选择已创建的实例的类型,本文以AnalyticDB MySQL版为例。
MySQL版
PostgreSQL版
MySQL版
AnalyticDB实例ID
选择已创建的实例ID。
gp-bp10uo5n536wd****
数据库名
选择创建的数据库。
adb_sink_database
表名
选择创建的数据表。
adb_sink_table
数据映射
数据库中表的数据提取规则可按JsonPath规则输入。当Source(源)中数据格式配置为Json时,云消息队列 Kafka 版流出的数据格式如下所示:
{ "data": { "topic": "demo-topic", "partition": 0, "offset": 2, "timestamp": 1739756629123, "headers": { "headers": [], "isReadOnly": false }, "key":"adb-sink-k1", "value": { "userid":"xiaoming", "source":"shanghai" } }, "id": "7702ca16-f944-4b08-***-***-0-2", "source": "acs:alikafka", "specversion": "1.0", "type": "alikafka:Topic:Message", "datacontenttype": "application/json; charset=utf-8", "time": "2025-02-17T01:43:49.123Z", "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic", "aliyunaccountid": "1******6789" }
根据表列名配置相应的JsonPath,如userid的数值提取规则配置为
$.data.value.userid
数据库用户名
输入设置的用户名。
user
数据库密码
输入设置的密码。
******
网络配置
专有网络:通过专有网络VPC将Kafka消息投递到AnalyticDB。
公网:通过公网将Kafka消息投递到AnalyticDB。
专有网络
VPC
选择VPC ID。仅当网络配置为专有网络时需配置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。仅当网络配置为专有网络时需配置此参数。
在选择专有网络交换机后,需将该交换机所属的IP网段配置至云原生数据仓库 AnalyticDB MySQL 版实例的白名单中,详情请参见白名单。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。仅当网络配置为专有网络时需配置此参数。
test_group
返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用。
在提示对话框,阅读提示信息,然后单击确认。
启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。
步骤三:测试AnalyticDB Sink Connector
在任务列表页面,在AnalyticDB Sink Connector任务的事件源列单击源Topic。
在Topic详情页面,单击体验发送消息。
在快速体验消息收发面板,按照下图配置消息内容,然后单击确定。
此处的消息内容为JSON格式,JSON中需包含已创建的数据表的所有列,系统会将JSON中同名字段对应的值写入到数据表对应的列中。
在任务列表页面,在AnalyticDB Sink Connector任务的事件目标列单击目标实例。
在实例基本信息页面,单击右上角的登录数据库。
在DMS数据管理服务页面,执行以下语句,查询表中全量数据。
SELECT * FROM adb_sink_table;
查询结果如下所示:
- 本页导读 (1)
- 前提条件
- 步骤一:创建目标服务资源
- 步骤二:创建AnalyticDB Sink Connector并启动
- 步骤三:测试AnalyticDB Sink Connector