一键实时同步至Kafka

一键实时同步至Kafka方案支持全增量一体化同步,先进行全量数据迁移,然后再实时同步增量数据至目标端。本文为您介绍如何创建一键实时同步至Kafka任务。

前提条件

  1. 已完成数据源配置。您需要在数据集成同步任务配置前,配置好您需要同步的源端和目标端数据库,以便在同步任务配置过程中,可通过选择数据源名称来控制同步任务的读取和写入数据库。同步任务支持的数据源及其配置详情请参见支持的数据源及同步方案

    说明

    数据源相关能力介绍详情请参见:数据源概述

  2. 已完成数据源环境准备。您可以基于您需要进行的同步配置,在同步任务执行前,授予数据源配置的账号在数据库进行相应操作的权限。详情请参见:数据库环境准备概述

背景信息

方案属性

说明

可同步的表个数

  • 支持将源端多表数据写入至目标端多个Topic。

  • 支持通过目标Topic配置规则实现源端多表数据写入至目标端单Topic。

任务组成

当前方案将分别创建用于全量数据初始化的离线同步子任务,和用于增量数据实时同步的数据集成实时同步子任务,方案产生的离线同步子任务个数与最终读取的源端表个数有关。

数据写入

运行同步任务后,生成的离线同步任务将全量数据写入Kafka,待全量数据执行完成后,启动实时同步任务,将源端增量数据实时同步至目标端。

  • 对于源端同步表有主键的场景,同步时会使用主键值作为kafka记录的key,确保同主键的变更有序写入kafka的同一分区。

  • 对于源端同步表无主键的场景,如果选择了支持无主键表同步选项,则同步时kafka记录的key为空。如果要确保表的变更有序写入kafka,则选择写入的kafka topic必须是单分区。如果选择了自定义同步主键,则同步时使用其他非主键的一个或几个字段的联合,代替主键作为kafka记录的key。

  • 如果在kafka集群发生响应异常的情况下,仍要确保有主键表同主键的变更有序写入kafka的同一分区,则需要在配置kafka数据源时,在扩展参数表单中加入如下配置。

    {"max.in.flight.requests.per.connection":1,"buffer.memory": 100554432}

    重要

    添加配置后同步性能会大幅下降,需要在性能和严格保序可靠性之间做好权衡。

  • 实时同步写入kafka的消息总体格式、同步任务心跳消息格式及源端更改数据对应的消息格式,详情请参见:附录:消息格式

注意事项

独享集成资源组离线同步插件Datax版本必须大于等于20210726203000,实时同步插件Streamx版本必须大于等于202107121400,否则运行增量和全量同步Kafka数据时可能失败或者存在数据格式错误。

离线同步插件版本:在运维中心找到离线同步任务任务,单击查看运行日志在日志中搜索Detail log url跳转到离线同步详情日志页,然后搜索DataX( ..... ),From Alibaba!格式文本,例如,DataX (20210709_keyindex-20210709144909), From Alibaba !”,截图中框起来的内容即离线同步插件Datax的版本。离线插件版本查看方法

实时同步插件版本:在运维中心找到实时同步任务任务,在日志中搜索StreamX( ..... ),From Alibaba!格式文本,例如,StreamX (202107290000_20210729121213), From Alibaba !,截图中内容即实时同步插件StreamX的版本。实时同步插件版本查看方法

操作流程

  1. 步骤一:选择同步方案

  2. 步骤二:配置网络连通

  3. 步骤三:设置同步来源与规则

  4. 步骤四:设置目标Topic

  5. 步骤五:运行资源设置

  6. 步骤六:执行同步任务

步骤一:选择同步方案

创建同步解决方案任务,根据需求选择需要同步的源端数据源和目标端数据源,并选择一键实时同步至Kafka方案。

步骤二:配置网络连通

源端选择已创建的源端数据源与目标端数据源,以及用于执行同步任务的资源组。并测试资源组与数据源的网络连通。详情请参见:配置同步网络链接

步骤三:设置同步来源与规则

  1. 基本配置区域,配置同步解决方案的名称、任务存放位置等信息。

  2. 数据来源区域,确认需要同步的源端数据源相关信息。

  3. 选择同步的源表区域,选中需要同步的源表,单击图标图标,将其移动至已选源表

    该区域会为您展示所选数据源下所有的表,您可以选择整库全表或部分表进行同步。

  4. 设置表名到Topic的映射规则区域,单击添加规则,选择相应的规则进行添加。

    同步时默认将源端数据表写入Kafka同名Topic中,同时,您可以通过添加映射规则定义最终写入目的端的Topic名称。实现将多张表数据写入到同一个Topic中,或统一将源端某固定前缀的表名在写入目标Topic时更新为其他前缀。支持通过正则表达式转换写入的Topic名,还支持使用内置变量拼接目标Topic名。配置逻辑请参见:设置同步来源与规则

步骤四:设置目标Topic

  1. 设置基本信息。

    功能

    描述

    支持源表无主键同步

    定义当源端为无主键表时,是否允许同步至Kafka。当勾选支持源表无主键同步后,源表没有主键,也可以向下游同步,但是同步数据时kafka记录的key将使用空值,只有当写入的kafka topic是单分区,才能确保变更有序写入。

    发送心跳记录

    定义是否发送报警信息至Kafka。勾选发送心跳记录后,实时同步任务将每隔5秒往Kafka中写入一条带有当前时间信息的记录。这样即使源端没有读取到新数据,Kafka中最新数据的时间信息也会持续更新,您可以根据Kafka中读取到的最新数据的时间判断实时同步的进度。

    源端update变更对应一条Kafka记录

    定义当源端进行update操作时,是否同步update源表数据。

    • 勾选后,源端关系型数据库一条记录的一次update变更,变更前和变更后的数据将保存在一条Kafka记录中。

    • 未勾选,源端关系型数据库一条记录的一次update变更,将保存在两条Kafka记录中,分别保存变更前和变更后的数据。

  2. 刷新源表与目标表映射。

    单击刷新源表和Kafka Topic映射将根据您在步骤三配置的设置表名到Topic的映射规则来生成目标Topic,若步骤三未配置映射规则,将默认写入与源表同名的目标Topic,若目标端不存在该同名Topic,将默认新建。同时,您可以修改Topic建立方式、为目标Topic在源有表字段基础上增加附加字段。

    说明

    目标表名将跟据您在设置表名到Topic的映射规则阶段配置的表名转换规则自动转换。

    功能

    描述

    为非主键表选择主键

    • 如果来源库有主键,则同步数据时会使用该主键值作为kafka记录的key,确保同主键的变更有序写入kafka的同一分区。

    • 如果来源库没有主键:

      • 当勾选了支持源表无主键同步,则无主键的表可以正常同步。此时写入kafka记录的key将使用空值,只有当写入的kafka topic是单分区,才能确保变更有序写入,此外,您还可以选择单击编辑图标自定义主键,即使用其他非主键的一个或几个字段的联合,代替主键作为kafka记录的key。

      • 当在设置目标Topic页面未勾选支持源表无主键同步,则无主键的表同步时会出现异常,您需要在同步任务中删除无主键的表或者选择单击编辑图标自定义主键才能继续执行同步任务。

    选择Topic建立方式

    支持自动建Topic使用已有Topic

    • Topic建立方式选择使用已有Topic时,您可以在Kafka Topic列的下拉列表中选择需要使用的Topic名称。

    • Topic建立方式选择自动建Topic时,显示自动创建的Kafka Topic名称。您可以单击Topic名称,查看和修改建Topic名称和注释。

    编辑附加字段

    单击操作列的编辑附加字段,可以为目标表在源端字段的基础上增加字段并为字段赋值。支持手动赋值常量与变量。

    说明

    仅在Topic建立方式自动建Topic时,可以使用此功能。

    编辑目标Topic

    同步解决方案默认根据源端生成目标Topic结构,可能存在字段类型转换,即若目标端数据库中没有与源端一致的数据类型时,同步任务在自动创建目标Topic时,将自动为源端字段匹配目标端可写入的字段类型。支持您单击Kafka Topic列的目标Topic名根据需求修改查看和修改目标字段类型映射。

    说明

    仅在Topic建立方式自动建Topic时,支持编辑目标Topic。

步骤五:运行资源设置

当前方案创建后将分别生成全量数据离线同步子任务和增量数据实时同步子任务。您需要在运行资源设置界面配置离线同步任务和实时同步任务的相关属性。

包括实时增量同步及离线全量同步使用的独享数据集成资源组、离线全量同步使用的调度资源组,同时,单击高级配置可配置是否容忍脏数据、任务最大并发数、源库允许支持的最大连接数等参数。

说明
  • DataWorks的离线同步任务通过调度资源组将其下发到数据集成任务执行资源组上执行,所以离线同步任务除了涉及数据集成任务执行资源组外,还会占用调度资源组资源。如果使用了独享调度资源组,将会产生调度实例费用。您可通过任务下发机制对该机制进行了解。

  • 离线和实时同步任务推荐使用不同的资源组,以便任务分开执行。如果选择同一个资源组,任务混跑会带来资源抢占、运行态互相影响等问题。例如,CPU、内存、网络等互相影响,可能会导致离线任务变慢或实时任务延迟等问题,甚至在资源不足的极端情况下,可能会出现任务被OOM KILLER杀掉等问题。

步骤六:执行同步任务

  1. 进入数据集成 > 同步任务界面,找到已创建的同步方案。

  2. 单击操作列的启动/提交执行按钮,启动同步的运行。

  3. 单击操作列的执行详情,查看任务的详细执行过程。

附录:写入Kafka消息格式定义

完成配置实时同步任务的操作后,执行同步任务会将源端数据库读取的数据,以JSON格式写入到Kafka topic中。除了会将设置的源端表中已有数据全部写入Kafka对应Topic中,还会启动实时同步将增量数据持续写入Kafka对应Topic中,同时源端表增量DDL变更信息也会以JSON格式写入Kafka对应Topic中。您可以通过附录:消息格式获取写入Kafka的消息的状态及变更等信息。

说明

通过离线同步任务写入Kafka的数据JSON结构中的payload.sequenceId、payload.timestamp.eventTimepayload.timestamp.checkpointTime字段均设置为-1

后续步骤

完成任务配置后,您可以对已创建的任务进行管理、执行加减表操作,或对任务配置监控报警,并查看任务运行的关键指标等。详情请参见:全增量同步任务运维