文档

通过数据同步功能将Kafka数据同步至湖仓版

更新时间:

AnalyticDB for MySQL湖仓版(3.0)支持新建Kafka同步链路,通过同步链路从指定时间位点,实时同步Kafka中的数据入湖,以满足近实时产出、全量历史归档、弹性分析等需求。本文主要介绍如何添加Kafka数据源,新建Kafka同步链路并启动任务,以及数据同步后如何进行数据分析和数据源管理。

前提条件

注意事项

Kafka中创建的Topic数据超过一定的时间会被自动清理,如果Topic数据过期,同时数据同步任务失败,重新启动同步任务时读取不到被清理掉的数据,会有丢失数据的风险。因此请适当调大Topic数据的生命周期并在数据同步任务失败时及时联系技术支持。

使用流程

新建数据源

说明

如果您已添加Kafka数据源,可跳过该步骤,直接新建同步链路,详情请参见新建同步链路

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版(3.0)页签,单击目标集群ID。

  2. 在左侧导航栏,单击数据接入>数据源管理

  3. 单击右上角新建数据源

  4. 新建数据源页面进行参数配置。参数说明如下表所示:

    参数名称

    参数说明

    数据源类型

    选择数据源类型Kafka

    数据源名称

    系统默认按数据源类型与当前时间生成名称,可按需修改。

    数据源描述

    数据源备注描述,例如湖仓应用场景、应用业务限制等。

    部署模式

    目前仅支持阿里云实例。

    Kafka实例

    Kafka实例ID。

    登录云消息队列 Kafka 版控制台,在实例列表页面查看实例ID。

    Kafka Topic

    在Kafka中创建的Topic名称。

    登录云消息队列 Kafka 版控制台,在目标实例的Topic 管理页面查看Topic名称。

    消息数据格式

    Kafka消息数据格式,目前仅支持JSON。

  5. 参数配置完成后,单击创建

新建同步链路

  1. 在左侧导航栏,单击数据同步

  2. 在右上角,单击新建同步链路

  3. 新建同步链路页面,进行数据源的数据源及目标端配置目标库表配置同步配置

    • 数据源及目标端配置的参数说明如下:

      参数名称

      参数说明

      数据链路名称

      数据链路名称。系统默认按数据源类型与当前时间生成名称,可按需修改。

      数据源

      选择已有的Kafka数据源,也可新建数据源。

      目标端类型

      目前仅支持数据湖-OSS存储

      OSS路径

      AnalyticDB MySQL湖仓数据在OSS中的存储路径。

      重要
      • 展示的Bucket是与AnalyticDB MySQL集群同地域的所有Bucket,您可以任意选择其中一个。请谨慎规划存储路径,创建后不允许修改。

      • 建议选择一个空目录,且不能与其他任务的OSS路径有相互前缀关系,防止数据覆盖。例如,两个数据同步任务的OSS路径分别为oss://adb_demo/test/sls1/和oss://adb_demo/test/,OSS路径有相互前缀关系,数据同步过程中会有数据覆盖。

    • 目标库表配置参数说明如下:

      参数名称

      参数说明

      库名

      同步到AnalyticDB MySQL的数据库名称。如果不存在同名数据库,将新建库;如果已存在同名数据库,数据会同步到已存在的数据库中。库名命名规则,详见使用限制

      表名

      同步到AnalyticDB MySQL的表名称。如果库中不存在同名表,将新建表;如果库中已存在同名表,数据同步会失败。表名命名规则,详见使用限制

      样例数据

      自动从Kafka Topic中获取的最新数据作为样例数据。

      说明

      Kafka Topic中的数据需为JSON格式,若存在其他格式的数据,数据同步时会报错。

      JSON解析层级

      设置JSON的嵌套解析层数,取值说明:

      • 0:不做解析。

      • 1(默认值):解析一层。

      • 2:解析两层。

      • 3:解析三层。

      • 4:解析四层。

      JSON的嵌套解析策略,请参见JSON解析层级和Schema字段推断示例

      Schema字段映射

      展示样例数据经过JSON解析后的Schema信息。可在此调整目标字段名,类型或按需增删字段等。

      分区键设置

      为目标表设置分区键。建议按日志时间或者业务逻辑配置分区,以保证入湖与查询性能。如不设置,则目标表默认没有分区。

      目标端分区键的格式处理方法分为:时间格式化和指定分区字段。

      • 选择时间格式化处理方法时,需要选择一个时间字段(如时间戳)。按指定的时间字段和格式进行分区。例如,源字段为gmt_created,源字段格式为yyyy-MM-dd,目标字段名为year,目标字段分区配置为yyyy,表示按年进行分区。

      • 指定分区字段方式是直接按列分区。

    • 同步配置的参数说明如下:

      参数名称

      参数说明

      增量同步起始消费位点

      同步任务启动时会从选择的时间点开始消费Kafka数据。取值说明:

      • 最早位点(begin_cursor):自动从Kafka数据中最开始的时间点消费数据。

      • 最近位点(end_cursor):自动从Kafka数据中最近的时间点消费数据。

      • 自定义点位:您可以选择任意一个时间点,系统则会从Kafka中第一条大于等于该时间点的数据开始消费。

      Job型资源组

      指定任务运行的Job型资源组。

      增量同步所需ACU数

      指定任务运行的Job型资源组ACU数。最小ACU数为2,最大ACU数为Job型资源组可用计算最大资源数。建议多指定一些ACU数,可以提升入湖性能及任务稳定性。

      说明

      创建数据同步任务时,使用Job型资源组中的弹性资源。数据同步任务会长期占用资源,因此系统会从资源组中扣除该任务占用的资源。例如,Job型资源组的计算最大资源为48 ACU,已创建了一个8 ACU的同步任务,在该资源组中创建另一个同步任务时,可选的最大ACU数为40。

      高级配置

      高级配置可以让您对同步任务进行个性化的配置。如需进行个性化配置,请联系技术支持。

  4. 上述参数配置完成后,单击提交

启动数据同步任务

  1. 数据同步页面,选择创建成功的数据同步任务,在操作列单击启动

  2. 单击右上角查询,状态变为正在启动即数据同步任务启动成功。

数据分析

同步任务成功后,您可以通过Spark Jar开发对同步到AnalyticDB MySQL的数据进行分析。Spark开发的相关操作,请参见Spark开发编辑器Spark离线应用开发

  1. 在左侧导航栏,单击作业开发 > Spark Jar 开发

  2. 在默认模板中输入示例语句,并单击立即执行

    -- Here is just an example of SparkSQL. Modify the content and run your spark program.
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- Here are your sql statements
    show tables from lakehouse20220413156_adbTest;
  3. 可选:应用列表页签中,单击操作列的日志,查看Spark SQL运行的日志。

管理数据源

数据源管理页面,您可以在操作列执行以下操作。

操作按钮

说明

新建链路

快捷跳转到创建此数据源下的数据同步或数据迁移任务。

查看

查看数据源的详细配置。

编辑

编辑数据源属性,如更新数据源名称、描述等。

删除

删除当前数据源。

说明

当数据源下存在数据同步或数据迁移任务时,此数据源无法直接删除,需先在数据同步页面,单击目标同步任务操作列的删除,删除数据同步或数据迁移任务。

JSON解析层级和Schema字段推断示例

解析层级指按相应层数解析出JSON中的字段。如果用户向Kafka发送的JSON数据如下。

{
  "name" : "zhangle",
  "age" : 18,
  "device" : {
    "os" : {
        "test":lag,
        "member":{
             "fa":zhangsan,
             "mo":limei
       }
     },
    "brand" : "none",
    "version" : "11.4.2"
  }
}

JSON数据解析后,对应0~4层的效果如下。

0层解析

不做任何解析,直接输出原始JSON数据。

JSON字段

目标字段名

__value__

{ "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }}

__value__

1层解析

解析JSON的第一层字段。

JSON字段

目标字段名

name

zhangle

name

age

18

age

device

{ "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }

device

2层解析

解析JSON的第二层字段。如果字段没有嵌套则直接输出,例如name和age字段直接输出。如果字段中有嵌套,则输出其子层级字段,例如device字段有嵌套,因此输出其子层级device.osdevice.branddevice.version

重要

由于目标字段名不支持“.”,因此会自动替换为“_”。

JSON字段

目标字段名

name

zhangle

name

age

18

age

device.os

{ "test":lag,"member":{ "fa":zhangsan,"mo":limei }}

device_os

device.brand

none

device_brand

device.version

11.4.2

device_version

3层解析

JSON字段

目标字段名

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member

{ "fa":zhangsan,"mo":limei }

device_os_member

device.brand

none

device_brand

device.version

11.4.2

device_version

4层解析

JSON字段

目标字段名

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member.fa

zhangsan

device_os_member_fa

device.os.member.mo

lime

device_os_member_mo

device.brand

none

device_brand

device.version

11.4.2

device_version

  • 本页导读 (1)
文档反馈