文档

Kafka实时ETL同步至StarRocks

更新时间:

实时ETL同步方案根据来源Kafka指定Topic的内容结构对目的StarRocks表结构做初始化,然后将Kafka指定Topic的存量数据同步至StarRocks,同时也持续将增量数据实时同步至StarRocks。本文为您介绍如何创建Kafka实时ETL同步至StarRocks任务。

使用限制

添加数据源

新建Kafka数据源

您可以手动添加Kafka数据源至DataWorks,详情请参见:Kafka数据源

新建StarRocks数据源

  • 获取StarRocks数据源信息

    进入StarRocks产品控制台。找到您要进行数据同步的StarRocks集群,在实例详情界面获取到StarRocks的集群ID、连接地址、QueryPort、HttpPort信息。

  • 手动添加StarRocks数据源

    详情请参见StarRocks数据源

准备通用型资源组并与数据源网络连通

在进行数据同步前,需要完成您的通用型资源组和数据源的网络连通,详情请参见配置网络连通

说明

Kafka与StarRocks支持的网络类型如下:

  • Kafka: VPC网络、公网。

  • StarRocks: VPC网络。

创建同步任务

  1. 进入数据集成页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据集成,在下拉框中选择对应工作空间后单击进入数据集成

  2. 在数据集成同步任务页面的创建同步任务区域,选择来源类型Kafka、去向类型StarRocks,然后点击开始创建

  3. 配置同步任务基本信息。

    • 新任务名称:配置同步任务名称。

    • 同步类型:选择单表实时

    • 网络与资源配置:在下拉框中分别选择已创建的Kafka数据源、StarRocks数据源、通用型资源组,单击测试所有连通性,保障资源组与数据源之间的网络连通性。

  4. 单击下一步

  5. 配置Kafka来源信息。

    单击页面上方的Kafka来源,编辑Kafka来源信息。

    image

    1. 配置Kafka基本信息。

      • 选择Kafka集群中需要同步的Topic。

      • 其他配置可使用任务创建时生成的默认值,也可根据需要进行修改。

    2. 单击右上角的数据采样

      在弹出对话框中指定好开始时间采样条数后,单击开始采集按钮,可以对指定的Kafka Topic进行数据采样,同时您可以预览Topic中的数据,为后续数据处理节点的数据预览和可视化配置提供输入。

  6. 编辑数据处理节点。

    1. 单击image.png图标可以增加数据处理方式。

      目前提供5种数据处理方式,您可根据需要做顺序编排,在任务运行时会按照编排的数据处理先后顺序执行数据处理,5种数据处理方式包括:数据脱敏字符串替换数据过滤JSON解析字段编辑与赋值

      image

    2. 每完成一个数据处理节点配置,可以单击右上角的数据输出预览按钮,在弹出对话框中,单击重新获取上游输出,模拟得到Kafka Topic采样数据经过当前数据处理节点处理后的结果。image

    3. 数据输出预览窗口,您可以根据需要修改输入数据,或者单击手工构造数据按钮自定义输入数据,然后单击预览按钮,查看当前数据处理节点对数据的处理结果,当数据处理节点处理异常,或者产生脏数据时,也会实时反馈异常信息,能够帮助您快速评估数据处理节点配置的正确性,以及是否能得到预期结果。

      说明

      数据输出预览强依赖Kafka来源的数据采样,在执行数据输出预览前需要先在Kafka来源表单中完成数据采样。

  7. 配置StarRocks去向信息。

    单击页面上方的StarRocks,编辑StarRocks去向源信息。

    image

    1. 配置基本信息。

      • 选择要写入的StarRocks表是自动建表还是使用已有表

      • 填写或者选择要写入的StarRocks表名。

    2. 编辑建表结构。

      当选择自动建表时,您需要单击编辑表结构按钮,在弹框中编辑建表结构。同时,支持您单击根据上游节点输出列重新生成表结构按钮,自动根据上游节点输出列,生成表结构。您可以在自动生成的表结构中选择一列配置为主键。

      说明
      • 目前自动建表可视化模式要求StarRocks表必须为有主键表,且设置为分桶列,否则无法保存配置

      • 如您想同步到的StarRocks表无主键,可使用SQL模式建表或已有表。

      image

    3. 配置字段映射。

      保存建表结构或者选择使用已有表时,系统会自动按照同名映射原则生成上游列与StarRocks表列之间的映射,您可根据需要进行调整,支持一个上游列映射到多个StarRocks表列,不允许多个上游列映射到一个StarRocks表列,当上游列未配置到StarRocks表列的映射时,对应列不会写入StarRocks表。

      image

    4. 配置上游流入动态字段处理策略。

      上游流入动态字段处理策略用于控制上游数据处理节点(目前可以生成动态列的数据处理节点只有JSON解析)生成动态列的处理方式。如果在JSON解析节点配置了动态输出字段,则在StarRocks节点中会出现上游流入动态字段处理策略表单。

      动态列指在任务配置中未明确定义列名,而是根据源端输入数据内容的不同,能够解析出不同列名和列值,并输出到StarRocks节点的列。对上游流入动态字段处理策略如下表所示:

      参数

      描述

      忽略

      如果在StarRocks表中无与动态列同名的列,则忽略该动态列,将其他配置了映射关系的列写入StarRocks表。

      报错

      如果在StarRocks表中无与动态列同名的列,则同步任务报错停止。

      image

  8. 高级参数配置。

    单击页面右上角的高级参数配置,对同步任务运行时的并行度和资源进行配置。

    参数

    说明

    自动设置运行时配置

    根据任务配置自动为所有运行时配置项赋值,默认为true

    Worker数

    任务启动的Worker总数,默认值:1。

    • 最小值:1

    • 最大值:100

    单Worker并发数

    每个Worker启动的线程总数,默认值:1。

    • 最小值:1

    • 最大值:100

    数据格式

    导入中使用的数据格式,支持jsoncsv,默认为json

  9. 报警配置。

    为了能够及时感知到同步任务的异常并做出响应和处理,您可以对同步任务设置不同的报警策略。

    1. 单击右上角的报警配置,进入实时子任务报警设置页面。

    2. 单击新增报警,配置报警规则。报警规则设置可以参考实时同步任务告警设置最佳实践

    3. 管理报警规则。对于已创建的报警规则,您可以通过报警开关控制报警规则是否开启,同时,您可以根据报警级别将报警发送给不同的接收人。

  10. 资源组配置。

    您可以在右上角的资源组配置处修改任务运行使用的通用型资源组。

  11. 模拟运行。

    完成上述所有任务配置后,您可以通过模拟运行功能,模拟整个任务针对少量采样数据的处理,查看数据写入StarRocks表后的结果。当任务配置错误、模拟运行过程中异常或者产生脏数据时,会实时反馈出异常信息,能够帮助您快速评估任务配置的正确性,以及是否能得到预期结果。

    image

    1. 单击页面右上角的模拟运行,在弹出的对话框中设置针对Kafka Topic的采样参数(开始时间采样条数)。

    2. 单击开始采集得到采样数据。

    3. 单击预览按钮,模拟整个任务针对少量采样数据的处理。

完成上述所有任务配置后,单击完成配置,完成同步任务的配置。

任务运维

启动同步任务

完成配置之后,界面会自动跳转到任务列表页面,您可以单击对应任务的操作列的启动按钮,启动同步任务。

image

查看任务运行状态

创建完成同步任务后,您可以在同步任务页面找到已创建的同步任务,单击任务名称执行概况空白处,查看任务的运行详情。任务详情分为三个部分:

image

  • 基本信息:您可以查看同步任务的数据源信息、绑定的资源组等信息。

  • 执行状态:Kafka到StarRocks的同步任务分为结构迁移实时数据同步两个步骤,您可以查看任务执行状态。

  • 详细信息:您可以查看结构迁移以及实时同步的执行详情。

    • 结构迁移中包含目标表的创建方式(已有表或自动建表),如果是自动建表,将会为您展示建表的DDL。

    • 实时同步中包含实时同步的统计信息,包含实时的读写流量、脏数据、Failover和运行日志。

任务重跑

  • 直接重跑

    不修改任务配置,直接单击同步任务操作列的更多 > 重跑操作,重跑一次性任务。

  • 修改后重跑

    编辑任务,进行修改操作后,单击完成。此时任务的操作会变成应用更新,单击应用更新会直接触发修改后的任务重跑。实时同步任务会按照新的配置运行。