通过数据同步功能APS同步Kafka数据
云原生数据仓库 AnalyticDB MySQL 版支持数据同步APS(AnalyticDB Pipeline Service)功能,您可以新建Kafka同步链路,通过同步链路从指定时间位点,实时同步Kafka中的数据入仓。本文主要介绍添加Kafka数据源、新建Kafka同步链路并启动任务的操作步骤。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
已创建数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。
已创建云消息队列 Kafka 版(简称Kafka)实例,且与AnalyticDB for MySQL集群部署在同一VPC。
已创建Kafka Topic,并发送消息。详情请参见消息队列Kafka版快速入门操作流程。
注意事项
仅支持同步JSON格式的Kafka数据。
Kafka中创建的Topic数据超过一定的时间会被自动清理,如果Topic数据过期,同时数据同步任务失败,重新启动同步任务时读取不到被清理掉的数据,会有丢失数据的风险。因此请适当调大Topic数据的生命周期,并在数据同步任务失败时及时联系技术支持。
获取Kafka样例数据在大于8 KB的情况下,Kafka API会将数据进行截断,导致解析样例数据为JSON格式时失败,从而无法自动生成字段映射信息。
kafka源端表结构发生变化时,不会触发DDL自动变更,即变更不会同步至AnalyticDB for MySQL。
费用说明
AnalyticDB for MySQL集群的ACU弹性资源费用,请参见湖仓版计费项和企业版和基础版计费项。
操作步骤
步骤一:新建数据源
如果您已添加Kafka数据源,可跳过该步骤,直接新建同步链路。
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。
在左侧导航栏,单击数据接入>数据源管理。
单击左上角新建数据源。
在新建数据源页面进行参数配置。参数说明如下表所示:
参数名称
参数说明
数据源类型
选择数据源类型Kafka。
数据源名称
系统默认按数据源类型与当前时间生成名称,可按需修改。
数据源描述
数据源备注描述,例如应用场景、应用业务限制等。
部署模式
目前仅支持阿里云实例。
Kafka实例
Kafka实例ID。
登录云消息队列 Kafka 版控制台,在集群列表页面查看实例ID。
Kafka Topic
在Kafka中创建的Topic名称。
登录云消息队列 Kafka 版控制台,在目标实例的Topic 管理页面查看Topic名称。
消息数据格式
Kafka消息数据格式,目前仅支持JSON。
参数配置完成后,单击创建。
步骤二:新建同步链路
在左侧导航栏,单击SLS/Kafka数据同步。
在左上角,单击新建同步链路,然后单击Kafka数据源页签。
在新建同步链路页面,进行数据源的数据源及目标端配置、目标库表配置及同步配置。
数据源及目标端配置的参数说明如下:
参数名称
参数说明
数据链路名称
数据链路名称。系统默认按数据源类型与当前时间生成名称,可按需修改。
数据源
选择已有的Kafka数据源,也可新建数据源。
数据源格式
目前仅支持JSON。
目标端类型
选择:数仓-ADB存储。
ADB账号
AnalyticDB for MySQL集群的数据库账号。
ADB密码
AnalyticDB for MySQL集群数据库账号的密码。
目标库表配置参数说明如下:
参数名称
参数说明
库名
AnalyticDB for MySQL集群的数据库名称。
表名
AnalyticDB for MySQL集群的表名称。
样例数据
自动从Kafka Topic中获取的最新数据作为样例数据。
说明Kafka Topic中的数据需为JSON格式,若存在其他格式的数据,数据同步时会报错。
JSON解析层级
设置JSON的嵌套解析层数,取值说明:
0:不做解析。
1(默认值):解析一层。
2:解析两层。
3:解析三层。
4:解析四层。
JSON的嵌套解析策略,请参见通过数据同步功能APS同步Kafka数据(推荐)。
Schema字段映射
展示样例数据经过JSON解析后的Schema信息。可在此调整目标字段名、类型或按需增删字段等。
同步配置的参数说明如下:
参数名称
参数说明
投递起始点位
同步任务启动时会从选择的时间点开始消费Kafka数据。您可以选择任意一个时间点,系统则会从Kafka中第一条大于等于该时间点的数据开始消费。
脏数据处理模式
同步数据时,若目标表中的字段类型与源端实际同步的Kafka数据类型不匹配,则会导致同步失败。例如源端的数据是
abc,而目标表中的字段类型是int,此时会因为无法转换而导致同步异常。脏数据处理模式取值如下:
中断同步(默认值):数据同步终止,您需修改目标表的字段类型或修改为其他脏数据处理模式,再重启同步任务。
按NULL处理:脏数据字段按NULL值写入目标表。
例如:Kafka一行数据有3个字段(col1、col2、col3),其中col2字段为脏数据,则col2字段数据转为NULL值写入,col1、col3字段数据正常写入。
Job型资源组
指定任务运行的Job型资源组。
增量同步所需ACU数
指定任务运行的Job型资源组ACU数。最小ACU数为2,最大ACU数为Job型资源组可用计算最大资源数。建议适当调大所需ACU数,可以提升入仓性能及任务稳定性。
说明创建数据同步任务时,使用Job型资源组中的弹性资源。数据同步任务会长期占用资源,因此系统会从资源组中扣除该任务占用的资源。例如,Job型资源组的计算最大资源为48 ACU,已创建了一个8 ACU的同步任务,在该资源组中创建另一个同步任务时,可选的最大ACU数为40。
加入白名单
因数据同步网络打通需要,允许将kafka交换机网段添加到AnalyticDB for MySQL集群的白名单中。
上述参数配置完成后,单击提交。
步骤三:启动数据同步任务
在SLS/Kafka数据同步页面,选择创建成功的数据同步任务,在操作列单击启动。
单击左上方查询,状态变为正在运行即数据同步任务启动成功。