单表实时同步任务配置

更新时间:
复制为 MD 格式

DataWorks数据集成提供了单表实时同步任务,旨在实现不同数据源之间低延迟、高吞吐量的数据复制与流转。该功能基于先进的实时计算引擎,能够捕获源端数据的实时变更(增、删、改),并将其快速应用到目标端。本文以Kafka单表实时同步至MaxCompute为例,讲述单表实时任务的配置方式。

准备工作

步骤一:新建同步任务

  1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据集成 > 数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 在左侧导航栏单击同步任务,然后在页面顶部单击新建同步任务,并配置任务信息,此处以Kafka实时写入MaxCompute为例:

    • 数据来源类型Kafka

    • 数据去向类型MaxCompute

    • 具体类型单表实时

    • 同步步骤

      • 结构迁移:自动在目标端创建与源端匹配的数据库对象(如表、字段、数据类型等),但不包含数据。

      • 增量同步(可选):在全量同步完成后,持续地捕获源端发生的变更数据(新增、修改、删除),并将其同步至目标端。

        若源端为Hologres还支持全量同步,即先将已有数据全量同步至目标表后,自动进行数据的增量同步。
说明

支持的数据源及同步方案请参见:支持的数据源及同步方案

步骤二:配置数据源与运行资源

  1. 来源数据源选择已添加的Kafka数据源,去向数据源选择已添加的MaxCompute数据源。

  2. 运行资源区域,选择同步任务所使用的资源组,并为该任务分配资源组CU。支持为全量同步和增量同步分别设置CU,以精确控制资源,避免浪费。如果您的同步任务因资源不足出现OOM现象,请适当调整资源组的CU占用取值。

  3. 并确保来源数据源与去向数据源均通过连通性检查

步骤三:配置同步方案

1. 配置数据来源

  1. 配置页签,选择Kafka数据源中需要同步的Topic。

    其他配置可使用任务创建时生成的默认值,也可根据需要进行修改,参数详见:Kafka官方文档 

  2. 单击右上角的数据采样

    在弹出对话框中指定开始时间采样条数后,单击开始采集按钮,可以对指定的Kafka Topic进行数据采样,同时您可以预览Topic中的数据,为后续数据处理节点的数据预览和可视化配置提供输入。

  3. 输出字段配置页签,按需勾选同步任务需同步的字段。

    Kafka侧字段中默认提供6个字段。

    字段名

    含义

    __key__

    Kafka记录的Key。

    __value__

    Kafka记录的Value。

    __partition__

    Kafka记录所在分区号,分区号为从0开始的整数。

    __headers__

    Kafka记录的Headers。

    __offset__

    Kafka记录在所在分区的偏移量,偏移量为从0开始的整数。

    __timestamp__

    Kafka记录的13位整数毫秒时间戳。

    您也可以在后续的数据处理节点对字段进行更多转换处理。

2. 数据处理

开启数据处理按钮,目前提供5种数据处理方式(数据脱敏字符串替换数据过滤JSON解析字段编辑与赋值),您可根据需要做顺序编排,在任务运行时会按照编排的数据处理先后顺序执行数据处理。

image

每完成一个数据处理节点配置,可以单击右上角的数据输出预览按钮:

  1. 在输入数据下方的表格中,可以看到上个环节数据采样的结果。您可以单击重新获取上游输出,来刷新结果。

  2. 如果上游没有输出结果,也可以通过手工构造数据来模拟前置输出。

  3. 单击预览,可以查看上游环节输出的数据,经过数据处理组件处理后输出的结果。

image

说明

数据输出预览和数据处理强依赖Kafka来源的数据采样,在执行数据处理前需要先在Kafka来源表单中完成数据采样。

3. 配置数据去向

  1. 数据去向区域,选择Tunnel资源组,默认选择“公共传输资源”,即MC的免费quota。

  2. 选择要写入目标表是新建表还是使用已有表

    1. 如果是新建表,可在下拉列表中选择新建,默认会创建与数据来源端结构相同的表,您可以手动修改目标端表名和表结构。

    2. 如果是使用已有表,请下拉选择需要同步的目标表。

  3. (可选)编辑表结构。

    单击表名后的编辑按钮,可编辑该表的表结构,支持单击根据上游节点输出列重新生成表结构按钮,自动根据上游节点输出列,生成表结构。您可以在自动生成的表结构中选择一列配置为主键。

4. 配置字段映射

选择数据来源和数据去向后,需要指定读取端和写入端列的映射关系。配置字段映射关系后,任务将根据字段映射关系,将源端字段写入目标端对应类型的字段中。

  1. 系统会自动按照同名映射原则生成上游列与目标表列之间的映射,您可根据需要进行调整,支持一个上游列映射到多个目标表列,不允许多个上游列映射到一个目标表列,当上游列未配置到目标表列的映射时,对应列不会写入目标表。

  2. Kafka侧字段可自定义配置JSON解析,可以通过数据处理组件获取value字段内容,实现更精细的字段配置。

    image

  3. 分区设置(可选)。

    1. 时间自动分区是根据业务时间(此处为_timestamp)字段进行分区的,一级分区为年,二级分区为月,以此类推。

    2. 根据字段内容动态分区通过指定源端表某字段与目标MaxCompute表分区字段对应关系,实现源端对应字段所在数据行写入到MaxCompute表对应的分区中。

步骤四:高级配置

同步任务提供高级参数可供精细化配置,系统设有默认值,多数情况下无需修改。如有修改必要,您可以:

  1. 单击界面右上方的高级配置,进入高级参数配置页面。

    说明

    数据开发的高级配置位于任务配置界面右侧页签。

  2. 支持为同步任务的读端和写端分别设置参数。修改自动设置运行时配置,设置为false,可以自定义运行时配置

  3. 根据参数提示,修改参数值,参数含义见参数名称后的解释。部分参数的配置建议可参考实时同步高级参数

重要

请在完全了解参数含义与作用后果再进行修改,以免产生不可预料的错误或者数据质量问题。

步骤五:模拟运行

完成上述所有任务配置后,您可以单击左下角的模拟运行来调试任务,模拟整个任务针对少量采样数据的处理,查看数据写入目标表后的结果。当任务配置错误、模拟运行过程中异常或者产生脏数据时,会实时反馈出异常信息,能够帮助您快速评估任务配置的正确性,以及是否能得到预期结果。

  1. 在弹出的对话框中设置采样参数(开始时间采样条数)。

  2. 单击开始采集得到采样数据。

  3. 单击预览结果按钮,模拟任务运行,并查看输出结果。

模拟运行输出的结果仅作预览,不会写入目标端数据源,对生产数据造成影响。

步骤六:发布并执行任务

  1. 完成所有配置后,单击页面底部的保存,完成任务配置。

  2. 数据集成的任务需要发布至生产环境运行,因此新建或者编辑任务均需执行发布操作后方可生效。发布时,若勾选发布后直接启动运行,则在发布时会同步启动任务。否则,发布完成后,需要进入数据集成 > 同步任务界面,在目标任务的操作列,手动启动任务。

  3. 单击任务列表中对应任务的名称/ID,查看任务的详细执行过程。

步骤七:配置报警规则

任务发布上线运行后,可以为其配置告警规则,以便在任务出现异常时第一时间获得通知,确保生产环境的稳定性和数据时效性。在数据集成的任务列表,单击目标任务操作列的更多 > 报警配置

1.新增报警

image

(1) 单击新建规则,配置报警规则。

您可以通过设置报警原因,对任务的业务延迟Failover任务状态DDL通知任务资源利用率等指标进行监控,并根据指定的阈值设置CRITICALWARNING两种不同级别的告警方式。

  • 设置报警方式后,可通过设置高级参数配置,控制报警信息发送的时间间隔,防止一次性发送信息太多,造成浪费和消息堆积。

  • 若报警原因选择业务延迟任务状态任务资源利用率,也支持开启恢复通知,方便任务恢复正常后,通知接收人。

(2) 管理报警规则。

对于已创建的报警规则,您可以通过报警开关控制报警规则是否开启,同时,您可以根据报警级别将报警发送给不同的人员。

2.查看报警

单击展开任务列表的更多 > 报警配置,进入报警事件,可以查看已经发生的告警信息。

后续步骤

任务启动后,您可以点击任务名称,查看运行详情,进行任务运维和调优

常见问题

实时同步任务常见问题请参见实时同步常见问题

更多案例

Kafka单表实时同步至ApsaraDB for OceanBase

LogHub(SLS)单表实时入湖至Data Lake Formation

Hologres单表实时同步至Doris

Hologres单表实时同步至Hologres

Kafka单表实时同步至Hologres

LogHub(SLS)单表实时同步至Hologres

Kafka单表实时同步至Hologres

Hologres单表实时同步至Kafka

LogHub(SLS)单表实时同步至MaxCompute

Kafka单表实时同步至OSS数据湖

Kafka单表实时同步至StarRocks

Oracle单表实时同步至Tablestore