本文以Kafka实时入湖写入至OSS场景为例,为您介绍如何通过数据集成实时入湖。
使用限制
Kafka的版本需要大于等于0.10.2小于等于2.2.0。
本实践仅支持使用独享数据集成资源组。
准备独享数据集成资源组并与数据源网络连通
在进行数据同步前,需要完成您的独享数据集成资源组和数据源的网络连通,详情请参见配置资源组与网络连通。
OSS数据源在下一步新增OSS数据源时,指定Endpoint地址,即可实现OSS数据源与独享数据集成资源组的网络连通,无需特殊配置。
添加数据源
新建Kafka数据源
新建OSS数据源
在DataWorks数据源管理页面,单击新建数据源,根据界面提示新建OSS数据源。OSS数据源支持RAM角色授权模式与Access Key认证模式两种方式:
Access key模式
Endpoint:您在OSS产品控制台OSS Bucket信息页面概览处获取到的访问端口,请参见获取OSS Bucket信息。
AccessKey ID:访问密钥中的AccessKey ID,您可以进入用户信息管理页面获取。
AceessKey Secret:访问密钥中的AccessKey Secret,相当于登录密码。
RAM角色认证模式
RAM角色认证模式,可以参考文档通过RAM角色授权模式配置数据源进行配置。
配置完成后,单击测试连通性,选择您已经完成与OSS网络联通的数据集成独享资源组,确保连通状态为可连通,即可实现OSS与对应数据集成独享资源组的网络联通。
创建同步任务
进入数据集成主站,单击同步任务进入同步任务页面,在页面中单击新增任务,开始配置同步任务。
配置同步任务基本信息。
任务名称:自定义。
同步类型:来源数据源选择Kafka,去向数据源选择OSS,并选择单表实时同步方案。
网络与资源配置:在下拉框中分别选择已创建的Kafka数据源、OSS数据源、独享数据集成资源组,单击测试所有连通性,保障资源组与数据源之间的网络连通性。
配置Kafka来源信息。
单击页面上方的Kafka来源,编辑Kafka来源信息。
配置Kafka基本信息。
选择Kafka集群中需要同步的Topic。
其他配置可使用任务创建时生成的默认值,也可根据需要进行修改。
单击右上角的数据采样。
在弹出对话框中指定好开始时间和采样条数后,单击开始采集按钮,可以对指定的Kafka Topic进行数据采样,同时您可以预览Topic中的数据,为后续数据处理节点的数据预览和可视化配置提供输入。
编辑数据处理节点。
单击图标可以增加数据处理方式。目前提供5种数据处理方式,您可根据需要做顺序编排,在任务运行时会按照编排的数据处理先后顺序执行数据处理,5种数据处理方式包括:数据脱敏、字符串替换、数据过滤、JSON解析和字段编辑与赋值。每完成一个数据处理节点配置,可以单击右上角的数据输出预览按钮,在弹出对话框中,单击重新获取上游输出,模拟得到Kafka Topic采样数据经过当前数据处理节点处理后的结果。
在数据输出预览窗口,您可以根据需要修改输入数据,或者单击手工构造数据按钮自定义输入数据,然后单击预览按钮,查看当前数据处理节点对数据的处理结果,当数据处理节点处理异常,或者产生脏数据时,也会实时反馈异常信息,能够帮助您快速评估数据处理节点配置的正确性,以及是否能得到预期结果。
说明数据输出预览强依赖Kafka来源的数据采样,在执行数据输出预览前需要先在Kafka来源表单中完成数据采样。
配置OSS去向信息。
单击页面上方的OSS,编辑OSS去向源信息。
配置基本信息。
目标元数据库类型:如果您当前账号下开通了DLF产品,支持同步数据入湖时自动在DLF构建对应的元数据库和元数据表信息。
说明不支持跨地域构建元数据。
目标库:选择数据写入的目标数据库,支持您单击右侧的新建库创建DLF元数据库。
目标表:选择要写入的OSS表是自动建表还是使用已有表。
表名:填写或者选择要写入的OSS表名。
OSS存储路径选择:选择入湖后数据存储在OSS的哪个路径下。
编辑建表结构。
当选择自动建表时,您需要单击编辑表结构按钮,在弹框中编辑建表结构。同时,支持您单击根据上游节点输出列重新生成表结构按钮,自动根据上游节点输出列,生成表结构。您可以在自动生成的表结构中选择一列配置为主键。
配置字段映射。
保存建表结构或者选择使用已有表时,系统会自动按照同名映射原则生成上游列与OSS表列之间的映射,您可根据需要进行调整,支持一个上游列映射到多个OSS表列,不允许多个上游列映射到一个OSS表列,当上游列未配置到OSS表列的映射时,对应列不会写入OSS表。
配置上游流入动态字段处理策略。
上游流入动态字段处理策略用于控制上游数据处理节点(目前可以生成动态列的数据处理节点只有JSON解析)生成动态列的处理方式。如果在JSON解析节点配置了动态输出字段,则在OSS节点中会出现上游流入动态字段处理策略表单。
动态列指在任务配置中未明确定义列名,而是根据源端输入数据内容的不同,能够解析出不同列名和列值,并输出到OSS节点的列。对上游流入动态字段处理策略如下表所示:
参数
描述
加列
如果在OSS表中无与动态列同名的列,则触发OSS表加列后将动态列写入。
忽略
如果在OSS表中无与动态列同名的列,则忽略该动态列,将其他配置了映射关系的列写入OSS表。
报错
如果在OSS表中无与动态列同名的列,则同步任务报错停止。
高级参数配置。
单击页面右上角的高级参数配置,对同步任务运行时的并行度和资源进行配置,您可以根据Kafka Topic数据流量和分区数量确定对应配置项的取值,建议按照如下规则进行配置:
读端并发数=Kafka Topic分区数
写端并发数=Kafka Topic分区数
内存=1.5G+(256MB*Kafka Topic分区数)
Checkpoint时间间隔,单位毫秒,建议配置分钟级间隔。
由于同步任务的性能表现和资源占用受到源端和目标端系统数据流量、网络环境和系统负载等因素影响,基于上述简单规则,您可以根据实际情况做调整和修改。
报警配置。
为能够及时感知到同步任务的异常并做出响应和处理,您可以对同步任务设置不同的报警策略。
单击右上角的报警配置,进入实时同步子任务报警设置页面。
单击新增报警,配置报警规则。
报警规则设置可以参考实时同步任务告警设置最佳实践。
说明报警原因为DLL通知时,适用的DDL只允许选择新增列,在同步任务解析出新的动态列时将触发报警(触发条件不是在OSS表加列)。
管理报警规则。
对于已创建的报警规则,您可以通过报警开关控制报警规则是否开启,同时,您可以根据报警级别报警给不同的接收人。
资源组配置。
您可以在右上角的资源组配置处修改任务运行使用的独享数据集成资源组。
模拟运行。
完成上述所有任务配置后,您可以通过模拟运行功能,模拟整个任务针对少量采样数据的处理,查看数据写入OSS表后的结果。当任务配置错误、模拟运行过程中异常或者产生脏数据时,会实时反馈出异常信息,能够帮助您快速评估任务配置的正确性,以及是否能得到预期结果。
单击页面右上角的模拟运行,在弹出的对话框中设置针对Kafka Topic的采样参数(开始时间和采样条数)。
单击开始采集得到采样数据。
单击预览按钮,模拟整个任务针对少量采样数据的处理。
完成上述所有任务配置后,单击完成配置,完成同步任务的配置。
任务运维
启动同步任务
完成配置之后,界面会自动跳转到任务列表页面,您可以单击对应任务的操作列的启动按钮,启动同步任务。
查看任务运行状态
创建完成同步任务后,您可以在同步任务页面,找到已创建的同步任务,单击任务名称或执行概况空白处,查看任务的运行详情。任务详情分为三个部分:
基本信息:您可以查看同步任务的数据源信息、绑定的资源组等信息。
执行状态:Kafka到OSS的同步任务分为结构迁移和实时数据同步两个步骤,您可以查看任务执行状态。
详细信息:您可以查看结构迁移以及实时同步的执行详情。
结构迁移中包含目标表的创建方式(已有表或自动建表),如果是自动建表,将会为您展示建表的DDL。
实时同步中包含实时同步的统计信息,包含实时的运行信息、DDL记录、报警信息等。
任务重跑
直接重跑
不修改任务配置,直接单击同步任务操作列的
操作,重跑一次性任务。修改后重跑
编辑任务,进行修改操作后,单击完成。此时任务的操作会变成应用更新,单击应用更新会直接触发修改后的任务重跑。实时同步任务会按照新的配置运行。