Kafka增量数据同步至MaxCompute

本文以将Kafka增量数据同步至MaxCompute的一个实践为例,为您介绍Kafka的分钟、小时、天增量数据定时调度写入MaxCompute小时、天分区表的配置详情。

注意事项

  • Kafka的版本需要大于等于0.10.2小于等于2.2.x,且Kafka启用了记录时间戳,并且记录带有正确的业务时间戳。

  • 增量数据开始同步后,如果仍有时间戳小于等于起始时间的记录写入Kafka Topic的话,这些数据可能被漏读,所以当Kafka Topic中数据写入出现延迟或者时间戳乱序时,要注意对离线同步任务造成的数据漏读风险。

  • Kafka侧参数同步结束策略原则上只有满足以下条件可以选择1分钟读取不到新数据,否则存在数据漏读风险。

    • Kafka Topic中部分或全部分区存在长时间(10分钟以上)无数据写入情况。

    • 每个周期实例启动后,不会有时间戳小于结束时间参数的记录写入Kafka Topic。

创建数据源

  • 准备用于运行数据同步任务的工作空间与数据集成资源组,操作详情请参见新增和使用独享数据集成资源组。本实践下文以一个标准模式的工作空间使用独享数据集成资源组为例,为您示例操作详情。

  • 新建Kafka数据源,并完成Kafka数据源与数据集成资源组之间的网络连通检测,操作详情请参见配置Kafka数据源

    重要

    使用标准模式的工作空间时,您需要确认在Kafka数据源的开发和生产环境对应的Kafka集群中有同名Topic,用于开发和生成环境进行数据同步。

  • 准备MaxCompute数据源,操作详情请参见创建MaxCompute数据源

新建离线同步任务

在数据开发(DataStudio)页面的某个业务流程下,新建一个离线同步节点,根据界面提示配置节点的路径、名称等信息,操作详情请参见通过向导模式配置离线同步任务

配置对应数据源与资源组

在新建离线同步节点后,需根据数据来源我的资源组以及数据去向,依次进行配置并测试连通性。

image

配置数据来源:Kafka侧参数

配置离线同步节点的数据来源相关参数。本实践将Kafka数据增量同步至MaxCompute,数据来源为Kafka数据,配置要点如下所示。数据来源kafka

说明

通用的Kafka数据来源的配置项介绍可查看Kafka Reader文档,以下为本次实践的配置参考。

配置项

配置要点

数据源主题

选择待同步的Kafka Topic。如果您使用的是标准模式的DataWorks,需要在对应开发和生产环境的Kafka集群中有同名的Topic,此处的主题即选择此同名的Topic即可。

说明

如果:

  • 开发环境的Topic不存在:则此处配置离线同步节点的主题时,下拉框中无法搜索到待同步的Topic。

  • 生产环境的Topic不存在:则离线同步任务配置完,提交发布后,在生产环境周期调度运行时会因为没法找到待同步的表而导致任务失败。

消费群组ID

根据业务需要填写,确保在Kafka集群侧唯一,便于在Kafka集群侧统计和监控消费情况。

Kafka版本

根据待同步数据的Kafka集群实际情况选择。

说明

Kafka的版本需要大于等于0.10.2、小于等于2.2.x。

读取起始位点起始时间读取结束位点结束时间

读取起始点位读取结束位点选择指定时间起始时间结束时间分别设置为调度参数${startTime}${endTime}

这几个参数明确了后续同步数据时从哪个数据开始同步,同步到哪个数据同步任务结束,本实践的配置表明${startTime}时间的数据开始同步,一直到${endTime}时间的数据结束。${startTime}${endTime}在同步任务实际运行时会根据调度配置做参数替换。

时区

可置空或选择默认使用DataWorks所在地域的服务器时区。

说明

如果您此前有联系阿里云技术支持修改过调度时区,这里可选择您修改后的时区。

键类型值类型编码

根据Kafka Topic记录实际情况选择。

同步结束策略

同步结束策略如果满足以下条件可以选择1分钟读取不到新数据,否则选择到达指定结束位点

  • Kafka Topic中部分或全部分区存在长时间(10分钟以上)无数据写入情况。

  • 每个周期实例启动后,不会有时间戳小于结束时间参数的记录写入Kafka Topic。

高级配置

保持默认即可。

配置数据去向:MaxCompute侧参数

配置离线同步节点的数据去向相关参数。本实践将Kafka数据增量同步至MaxCompute,数据去向为MaxCompute表,配置要点如下所示。去向MaxCompute

配置项

配置要点

数据源

选择上述新建的MaxCompute数据源。

选择数据写入的MaxCompute表。如果您使用的是标准类型的DataWorks工作空间,请确保在MaxCompute的开发环境和生产环境中存在同名且表结构一致的MaxCompute表。

说明

如果:

  • 开发环境不存在待同步的MaxCompute表,则选择此处配置离线同步节点的去向表的下拉框中无法搜到待同步表。

  • 生产环境不存在待同步的MaxCompute表,同步任务提交发布后,数据同步任务调度运行时将会由于无法找到待同步表而导致同步任务运行失败。

  • 开发环境和生产环境的表结构不一致,同步任务提交发布后,同步任务实际调度运行时的列对应关系,可能与此处离线同步节点配置的列对应关系不一致,最终导致数据写入不正确。

如果待写入的MaxCompute表未生成,您可以使用一键生成目标表结构功能快速建表,一键生成目标表结构会自动生成建表SQL,解读详情请参见下文的附录:一键生成目标表结构

分区信息

本实践示例的写入MaxCompute表有一个分区列ds,在ds = 后的输入框中配置调度参数${partition},表示每次进行数据同步时,向${partition}这个分区中写入数据,${partition}会根据调度配置做参数替换。

说明

分区信息会根据MaxCompute表的实际结构定义确定是否有该配置项以及配置项的表单数量,如果选择写入的是非分区表,则不会出现该配置项;如果选择写入的是分区表,则会根据分区表实际分区列个数和分区列名出现对应的表单项。

其他参数保持默认即可。

配置字段映射

  1. 编辑字段映射中Kafka侧字段定义。

    • Kafka侧字段中默认的6个字段。

      字段名

      含义

      __key__

      Kafka记录的Key。

      __value__

      Kafka记录的Value。

      __partition__

      Kafka记录所在分区号,分区号为从0开始的整数。

      __headers__

      Kafka记录的dHeaders。

      __offset__

      Kafka记录在所在分区的偏移量,偏移量为从0开始的整数。

      __timestamp__

      Kafka记录的13位整数毫秒时间戳。

    • Kafka侧字段可自定义配置JSON解析,可以通过.(获取子字段)[](获取数组元素)两种语法,获取Kafka记录JSON格式的value字段内容。

      重要

      如果JSON字段名中带有"."字符,由于会引发字段定义语法歧义,无法通过字段定义获取字段值。

      Kafka某条JSON格式的记录value的数据示例如下。

      {
            "a": {
            "a1": "hello"
            },
            "b": "world",
            "c":[
                  "xxxxxxx",
                  "yyyyyyy"
                  ],
            "d":[
                  {
                        "AA":"this",
                        "BB":"is_data"
                  },
                  {
                        "AA":"that",
                        "BB":"is_also_data"
                  }
              ],
           "a.b": "unreachable"
      }
      • 如果同步a1的数据“hello”,Kafka侧字段增加a.a1

      • 如果同步b的数据“world”,Kafka侧字段增加b

      • 如果同步c的数据“yyyyyyy”,Kafka侧字段增加c[1]

      • 如果同步AA的数据“this”,Kafka侧字段增加d[0].AA

      • Kafka侧字段定义增加a.b无法同步数据"unreachable"

  2. 指定Kafka侧列定义与MaxCompute侧列定义的对应关系。字段映射

    • 允许源头表字段或目标表字段存在不参与映射的字段,源头表不参与映射的字段同步实例不会读取,目标端不参与映射的字段将写入NULL。

    • 不允许一个源头表字段映射到多个目标表字段,也不允许一个目标表字段映射到多个目标表字段。

调度配置

本实践示例涉及的调度配置要点如下。通用的调度配置指导及全量调度相关参数的介绍请参见调度配置

  • 配置调度参数。

    在上述配置数据来源与数据去向时,使用了三个调度参数:${startTime}${endTime}${partition},在此处调度配置中需根据实际同步需求指定这三个调度参数的替换策略,以下为几个典型场景的配置示例。

    典型场景

    推荐配置

    场景示例说明

    同步任务每5分钟调度一次

    调度参数1

    • startTime=$[yyyymmddhh24mi-8/24/60]00

    • endTime=$[yyyymmddhh24mi-3/24/60]00

    • partition=$[yyyymmddhh24mi-8/24/60]

    如果同步任务在2022-11-22 10:00被调度启动,则:

    • 会同步Kafka Topic中时间戳范围在2022-11-22 09:52(含)到2022-11-22 09:57(不含)的记录。

    • 同步的Kafka数据写入MaxCompute的202211220952分区中。

    • endTime设置比实例调度时间($[yyyymmddhh24mi])早三分钟是为了确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中,避免漏读。

    同步任务每小时调度一次

    调度参数22

    • startTime=$[yyyymmddhh24-1/24]0000

    • endTime=$[yyyymmddhh24]0000

    • partition=$[yyyymmddhh24]

    说明
    • 同步任务每2小时调度一次时,startTime=$[yyyymmddhh24-2/24]0000,另外调度参数保持不变。

    • 同步任务每3小时调度一次时,startTime=$[yyyymmddhh24-3/24]0000,另外调度参数保持不变。

    • 以此类推其他以小时为调度周期的场景下,调度参数的配置结果。

    如果同步任务在2022-11-22 10:05被调度启动,则:

    • 会同步Kafka Topic中时间戳范围在2022-11-22 9:00(含)到2022-11-22 10:00(不含)的记录。

    • 同步的Kafka数据写入MaxCompute的2022112210分区中。

    同步任务每天调度一次

    调度参数3

    • startTime=$[yyyymmdd-1]000000

    • endTime=$[yyyymmdd]000000

    • partition=$[yyyymmdd-1]

    如果同步任务在2022-11-22 00:05被调度启动,则:

    • 会同步Kafka Topic中时间戳范围在2022-11-21 00:00(含)到2022-11-22 00:00(不含)的记录。

    • 同步的Kafka数据写入MaxCompute的20221121分区中。

    同步任务每周调度一次

    调度参数4

    • startTime=$[yyyymmdd-7]000000

    • endTime=$[yyyymmdd]000000

    • partition=$[yyyymmdd-1]

    如果同步任务在2022-11-22 00:05被调度启动,则:

    • 会同步Kafka Topic中时间戳范围在2022-11-15 00:00(含)到2022-11-22 00:00(不含)的记录。

    • 同步的Kafka数据写入MaxCompute的20221121分区中。

    同步任务每月调度一次

    调度参数4

    • startTime=$[add_months(yyyymmdd,-1)]000000

    • endTime=$[yyyymmdd]000000

    • partition=$[yyyymmdd-1]

    如果同步任务在2022-11-22 00:05被调度启动,则:

    • 会同步Kafka Topic中时间戳范围在2022-10-22 00:00(含)到2022-11-22 00:00(不含)的记录。

    • 同步的Kafka数据写入MaxCompute的20221121分区中。

  • 配置调度周期。

    • 根据希望的调度间隔,设置调度周期。

      典型场景

      推荐配置

      场景示例说明

      同步任务每5分钟调度一次

      • 调度周期:分钟

      • 开始时间:00:00

      • 时间间隔:05分钟

      • 结束时间:23:59

      同步任务每小时调度一次

      • 调度周期:小时

      • 开始时间:00:15

      • 时间间隔:1小时

      • 结束时间:23:59

      开始时间设置一个比00:00稍晚一点的时间,例如00:15,确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中。

      同步任务每天调度一次

      • 调度周期:天

      • 定时调度时间:00:15

      定时调度时间设置一个比00:00稍晚一点的时间,例如00:15,确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中。

      同步任务每周调度一次

      • 调度周期:周

      • 指定时间:星期一

      • 定时调度时间:00:15

      定时调度时间设置一个比00:00稍晚一点的时间,例如00:15,确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中。

      同步任务每月调度一次

      • 调度周期:月

      • 指定时间:每月1号

      • 定时调度时间:00:15

      定时调度时间设置一个比00:00稍晚一点的时间,例如00:15,确保同步任务实例启动后,对应时间区间内的数据已全部写入Kafka Topic中。

      重要

      如果出现实例启动后,仍有时间戳小于等于起始时间的记录写入Kafka Topic,则这些数据可能被漏读,所以当Kafka Topic中数据写入出现延迟或者时间戳乱序时,要注意对离线同步任务造成的数据漏读风险。

    • 设置重跑属性。推荐勾选出错后自动重跑,重跑次数设置3,重跑间隔设置2,让同步实例出现异常的话能够自动重跑实现自愈。

  • 配置调度资源组。

    按照需要选择调度资源组,推荐使用独享调度资源组。

  • 配置调度依赖。

    本实践同步任务无需在特定其他任务运行成功后才开始运行,选择使用工作空间根节点即可。

数据集成资源组配置

选择在创建数据源时,与Kafka数据和MaxCompute数据源都完成连通性检查的数据集成资源组。资源组配置

试运行验证

完成上述配置后,您可以在数据开发(DataStudio)的离线节点页面进行调试运行,验证离线数据同步的结果是否符合预期。

  1. 您可单击页面顶部的带参运行带参运行按钮,给Kafka侧参数${startTime}${endTime},以及MaxCompute侧参数${partition}赋值后,选择有空闲的调度资源组执行试运行。带参运行

  2. 等待试运行完成,确认运行成功无异常。

  3. 创建临时查询节点,执行检查SQL查询语句验证数据正确写入MaxCompute表中,且数据内容正确、数据量正确。

    select * from test_project.test_table where ds=2022112200 limit 10;
    select count(*) from test_project.test_table where ds=2022112200;

提交发布

试运行没有问题后,您可以保存离线节点的配置,并提交发布至运维中心,后续离线同步任务将会周期性(分钟或小时或天)将Kafka的数据写入MaxCompute的表中。提交发布的操作请参见发布任务

附录:一键生成目标表结构

一键生成目标表结构功能会自动生成建表SQL语句,SQL语句定义了建表后的表名、字段等信息。解读方式如下。一键生成目标表

  • MaxCompute表名与数据来源配置中Kafka的主题(Topic)配置结果一致。

  • MaxCompute表字段有6个字段,与Kafka记录的对应关系如下。

    字段名

    含义

    __key__

    Kafka记录的Key。

    __value__

    Kafka记录的Value。

    __partition__

    Kafka记录所在分区号,分区号为从0开始的整数。

    __headers__

    Kafka记录的dHeaders。

    __offset__

    Kafka记录在所在分区的偏移量,偏移量为从0开始的整数。

    __timestamp__

    Kafka记录的13位整数毫秒时间戳。

  • MaxCompute表生命周期默认为100年。

您可根据实际需要修改默认建表SQL中的这些参数。此外,您也可以通过字段映射定义对Kafka记录的value做JSON解析,根据实际需要在默认建表SQL中添加对应JSON解析结果的字段列。