实时ETL同步方案根据来源Kafka指定Topic的内容结构对目的StarRocks表结构做初始化,然后将Kafka指定Topic的存量数据同步至StarRocks,同时也持续将增量数据实时同步至StarRocks。本文为您介绍如何创建Kafka实时ETL同步至StarRocks任务。
使用限制
Kafka的版本需要大于等于0.10.2小于等于2.2.0。
本实践仅支持使用通用型资源组(新版资源组)。
添加数据源
新建Kafka数据源
您可以手动添加Kafka数据源至DataWorks,详情请参见:Kafka数据源。
新建StarRocks数据源
获取StarRocks数据源信息
进入StarRocks产品控制台。找到您要进行数据同步的StarRocks集群,在实例详情界面获取到StarRocks的集群ID、连接地址、QueryPort、HttpPort信息。
手动添加StarRocks数据源
详情请参见StarRocks数据源。
准备通用型资源组并与数据源网络连通
在进行数据同步前,需要完成您的通用型资源组和数据源的网络连通,详情请参见配置网络连通。
Kafka与StarRocks支持的网络类型如下:
Kafka: VPC网络、公网。
StarRocks: VPC网络。
创建同步任务
进入数据集成页面。
登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据集成,在下拉框中选择对应工作空间后单击进入数据集成。
在数据集成同步任务页面的创建同步任务区域,选择来源类型Kafka、去向类型StarRocks,然后点击开始创建。
配置同步任务基本信息。
新任务名称:配置同步任务名称。
同步类型:选择单表实时。
网络与资源配置:在下拉框中分别选择已创建的Kafka数据源、StarRocks数据源、通用型资源组,单击测试所有连通性,保障资源组与数据源之间的网络连通性。
单击下一步。
配置Kafka来源信息。
单击页面上方的Kafka来源,编辑Kafka来源信息。
配置Kafka基本信息。
选择Kafka集群中需要同步的Topic。
其他配置可使用任务创建时生成的默认值,也可根据需要进行修改。
单击右上角的数据采样。
在弹出对话框中指定好开始时间和采样条数后,单击开始采集按钮,可以对指定的Kafka Topic进行数据采样,同时您可以预览Topic中的数据,为后续数据处理节点的数据预览和可视化配置提供输入。
编辑数据处理节点。
单击图标可以增加数据处理方式。
目前提供5种数据处理方式,您可根据需要做顺序编排,在任务运行时会按照编排的数据处理先后顺序执行数据处理,5种数据处理方式包括:数据脱敏、字符串替换、数据过滤、JSON解析和字段编辑与赋值。
每完成一个数据处理节点配置,可以单击右上角的数据输出预览按钮,在弹出对话框中,单击重新获取上游输出,模拟得到Kafka Topic采样数据经过当前数据处理节点处理后的结果。
在数据输出预览窗口,您可以根据需要修改输入数据,或者单击手工构造数据按钮自定义输入数据,然后单击预览按钮,查看当前数据处理节点对数据的处理结果,当数据处理节点处理异常,或者产生脏数据时,也会实时反馈异常信息,能够帮助您快速评估数据处理节点配置的正确性,以及是否能得到预期结果。
说明数据输出预览强依赖Kafka来源的数据采样,在执行数据输出预览前需要先在Kafka来源表单中完成数据采样。
配置StarRocks去向信息。
单击页面上方的StarRocks,编辑StarRocks去向源信息。
配置基本信息。
选择要写入的StarRocks表是自动建表还是使用已有表。
填写或者选择要写入的StarRocks表名。
编辑建表结构。
当选择自动建表时,您需要单击编辑表结构按钮,在弹框中编辑建表结构。同时,支持您单击根据上游节点输出列重新生成表结构按钮,自动根据上游节点输出列,生成表结构。您可以在自动生成的表结构中选择一列配置为主键。
说明目前自动建表可视化模式要求StarRocks表必须为有主键表,且设置为分桶列,否则无法保存配置
如您想同步到的StarRocks表无主键,可使用SQL模式建表或已有表。
配置字段映射。
保存建表结构或者选择使用已有表时,系统会自动按照同名映射原则生成上游列与StarRocks表列之间的映射,您可根据需要进行调整,支持一个上游列映射到多个StarRocks表列,不允许多个上游列映射到一个StarRocks表列,当上游列未配置到StarRocks表列的映射时,对应列不会写入StarRocks表。
配置上游流入动态字段处理策略。
上游流入动态字段处理策略用于控制上游数据处理节点(目前可以生成动态列的数据处理节点只有JSON解析)生成动态列的处理方式。如果在JSON解析节点配置了动态输出字段,则在StarRocks节点中会出现上游流入动态字段处理策略表单。
动态列指在任务配置中未明确定义列名,而是根据源端输入数据内容的不同,能够解析出不同列名和列值,并输出到StarRocks节点的列。对上游流入动态字段处理策略如下表所示:
参数
描述
忽略
如果在StarRocks表中无与动态列同名的列,则忽略该动态列,将其他配置了映射关系的列写入StarRocks表。
报错
如果在StarRocks表中无与动态列同名的列,则同步任务报错停止。
高级参数配置。
单击页面右上角的高级参数配置,对同步任务运行时的并行度和资源进行配置。
参数
说明
自动设置运行时配置
根据任务配置自动为所有运行时配置项赋值,默认为
true
。Worker数
任务启动的Worker总数,默认值:1。
最小值:1
最大值:100
单Worker并发数
每个Worker启动的线程总数,默认值:1。
最小值:1
最大值:100
数据格式
导入中使用的数据格式,支持
json
和csv
,默认为json
。报警配置。
为了能够及时感知到同步任务的异常并做出响应和处理,您可以对同步任务设置不同的报警策略。
单击右上角的报警配置,进入实时子任务报警设置页面。
单击新增报警,配置报警规则。报警规则设置可以参考实时同步任务告警设置最佳实践。
管理报警规则。对于已创建的报警规则,您可以通过报警开关控制报警规则是否开启,同时,您可以根据报警级别将报警发送给不同的接收人。
资源组配置。
您可以在右上角的资源组配置处修改任务运行使用的通用型资源组。
模拟运行。
完成上述所有任务配置后,您可以通过模拟运行功能,模拟整个任务针对少量采样数据的处理,查看数据写入StarRocks表后的结果。当任务配置错误、模拟运行过程中异常或者产生脏数据时,会实时反馈出异常信息,能够帮助您快速评估任务配置的正确性,以及是否能得到预期结果。
单击页面右上角的模拟运行,在弹出的对话框中设置针对Kafka Topic的采样参数(开始时间和采样条数)。
单击开始采集得到采样数据。
单击预览按钮,模拟整个任务针对少量采样数据的处理。
完成上述所有任务配置后,单击完成配置,完成同步任务的配置。
任务运维
启动同步任务
完成配置之后,界面会自动跳转到任务列表页面,您可以单击对应任务的操作列的启动按钮,启动同步任务。
查看任务运行状态
创建完成同步任务后,您可以在同步任务页面找到已创建的同步任务,单击任务名称或执行概况空白处,查看任务的运行详情。任务详情分为三个部分:
基本信息:您可以查看同步任务的数据源信息、绑定的资源组等信息。
执行状态:Kafka到StarRocks的同步任务分为结构迁移和实时数据同步两个步骤,您可以查看任务执行状态。
详细信息:您可以查看结构迁移以及实时同步的执行详情。
结构迁移中包含目标表的创建方式(已有表或自动建表),如果是自动建表,将会为您展示建表的DDL。
实时同步中包含实时同步的统计信息,包含实时的读写流量、脏数据、Failover和运行日志。
任务重跑
直接重跑
不修改任务配置,直接单击同步任务操作列的
操作,重跑一次性任务。修改后重跑
编辑任务,进行修改操作后,单击完成。此时任务的操作会变成应用更新,单击应用更新会直接触发修改后的任务重跑。实时同步任务会按照新的配置运行。