通过数据同步功能APS同步Kafka数据

更新时间:

云原生数据仓库 AnalyticDB MySQL 版支持数据同步APS(AnalyticDB Pipeline Service)功能,您可以新建Kafka同步链路,通过同步链路从指定时间位点,实时同步Kafka中的数据入仓。本文主要介绍添加Kafka数据源、新建Kafka同步链路并启动任务的操作步骤。

前提条件

注意事项

  • 仅支持同步JSON格式的Kafka数据。

  • Kafka中创建的Topic数据超过一定的时间会被自动清理,如果Topic数据过期,同时数据同步任务失败,重新启动同步任务时读取不到被清理掉的数据,会有丢失数据的风险。因此请适当调大Topic数据的生命周期,并在数据同步任务失败时及时联系技术支持。

  • 获取Kafka样例数据在大于8 KB的情况下,Kafka API会将数据进行截断,导致解析样例数据为JSON格式时失败,从而无法自动生成字段映射信息。

  • kafka源端表结构发生变化时,不会触发DDL自动变更,即变更不会同步至AnalyticDB for MySQL

费用说明

AnalyticDB for MySQL集群的ACU弹性资源费用,请参见湖仓版计费项企业版和基础版计费项

操作步骤

步骤一:新建数据源

说明

如果您已添加Kafka数据源,可跳过该步骤,直接新建同步链路

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。

  2. 在左侧导航栏,单击数据接入>数据源管理

  3. 单击左上角新建数据源

  4. 新建数据源页面进行参数配置。参数说明如下表所示:

    参数名称

    参数说明

    数据源类型

    选择数据源类型Kafka

    数据源名称

    系统默认按数据源类型与当前时间生成名称,可按需修改。

    数据源描述

    数据源备注描述,例如应用场景、应用业务限制等。

    部署模式

    目前仅支持阿里云实例。

    Kafka实例

    Kafka实例ID。

    登录云消息队列 Kafka 版控制台,在集群列表页面查看实例ID。

    Kafka Topic

    Kafka中创建的Topic名称。

    登录云消息队列 Kafka 版控制台,在目标实例的Topic 管理页面查看Topic名称。

    消息数据格式

    Kafka消息数据格式,目前仅支持JSON。

  5. 参数配置完成后,单击创建

步骤二:新建同步链路

  1. 在左侧导航栏,单击SLS/Kafka数据同步

  2. 在左上角,单击新建同步链路,然后单击Kafka数据源页签

  3. 新建同步链路页面,进行数据源的数据源及目标端配置目标库表配置同步配置

    • 数据源及目标端配置的参数说明如下:

      参数名称

      参数说明

      数据链路名称

      数据链路名称。系统默认按数据源类型与当前时间生成名称,可按需修改。

      数据源

      选择已有的Kafka数据源,也可新建数据源。

      数据源格式

      目前仅支持JSON。

      目标端类型

      选择:数仓-ADB存储

      ADB账号

      AnalyticDB for MySQL集群的数据库账号。

      ADB密码

      AnalyticDB for MySQL集群数据库账号的密码。

    • 目标库表配置参数说明如下:

      参数名称

      参数说明

      库名

      AnalyticDB for MySQL集群的数据库名称。

      表名

      AnalyticDB for MySQL集群的表名称。

      样例数据

      自动从Kafka Topic中获取的最新数据作为样例数据。

      说明

      Kafka Topic中的数据需为JSON格式,若存在其他格式的数据,数据同步时会报错。

      JSON解析层级

      设置JSON的嵌套解析层数,取值说明:

      • 0:不做解析。

      • 1(默认值):解析一层。

      • 2:解析两层。

      • 3:解析三层。

      • 4:解析四层。

      JSON的嵌套解析策略,请参见通过数据同步功能APS同步Kafka数据(推荐)

      Schema字段映射

      展示样例数据经过JSON解析后的Schema信息。可在此调整目标字段名、类型或按需增删字段等。

    • 同步配置的参数说明如下:

      参数名称

      参数说明

      投递起始点位

      同步任务启动时会从选择的时间点开始消费Kafka数据。您可以选择任意一个时间点,系统则会从Kafka中第一条大于等于该时间点的数据开始消费。

      脏数据处理模式

      同步数据时,若目标表中的字段类型与源端实际同步的Kafka数据类型不匹配,则会导致同步失败。例如源端的数据是abc,而目标表中的字段类型是int,此时会因为无法转换而导致同步异常。

      脏数据处理模式取值如下:

      • 中断同步(默认值):数据同步终止,您需修改目标表的字段类型或修改为其他脏数据处理模式,再重启同步任务。

      • NULL处理:脏数据字段按NULL值写入目标表。

      例如:Kafka一行数据有3个字段(col1、col2、col3),其中col2字段为脏数据,则col2字段数据转为NULL值写入,col1、col3字段数据正常写入。

      Job型资源组

      指定任务运行的Job型资源组。

      增量同步所需ACU

      指定任务运行的Job型资源组ACU数。最小ACU数为2,最大ACU数为Job型资源组可用计算最大资源数。建议适当调大所需ACU数,可以提升入仓性能及任务稳定性。

      说明

      创建数据同步任务时,使用Job型资源组中的弹性资源。数据同步任务会长期占用资源,因此系统会从资源组中扣除该任务占用的资源。例如,Job型资源组的计算最大资源为48 ACU,已创建了一个8 ACU的同步任务,在该资源组中创建另一个同步任务时,可选的最大ACU数为40。

      加入白名单

      因数据同步网络打通需要,允许将kafka交换机网段添加到AnalyticDB for MySQL集群的白名单中。

  4. 上述参数配置完成后,单击提交

步骤三:启动数据同步任务

  1. SLS/Kafka数据同步页面,选择创建成功的数据同步任务,在操作列单击启动

  2. 单击左上方查询,状态变为正在运行即数据同步任务启动成功。