日志实时入湖快速入门

更新时间:
复制为 MD 格式

本文为您介绍如何在实时计算控制台将存储在Kafkajson格式的埋点数据通过Flink CDC数据摄入功能高效入湖,完成表结构推导并自动创建目标表,在数据同步的过程中分析出表结构变更并自动应用到下游,减少您在日志实时入湖场景的运维负担。

前提条件

步骤一:配置IP白名单

为了让Flink能访问Kafka实例,您需要将Flink工作空间的网段添加到Kafka的白名单中。

  1. 获取Flink工作空间的VPC网段。

    1. 登录实时计算控制台

    2. 在目标工作空间右侧操作列,选择更多 > 工作空间详情

    3. 工作空间详情对话框,查看虚拟交换机的网段信息。

      网段信息

  2. 在消息队列KafkaIP白名单中,添加Flink工作空间的网段信息。

    您需要为网络类型为VPC的接入点配置白名单,操作步骤请参见配置白名单Kafka白名单

步骤二:准备Kafka测试数据

使用实时计算Flink版的模拟数据生成Faker作为数据生成器,将数据写入到Kafka中。请按以下步骤使用实时计算开发控制台将数据写入至消息队列Kafka。

  1. Kafka控制台创建一个名称为usersTopic。

    操作详情请参见步骤一:创建Topic

  2. 创建将数据写入到Kafka的作业。

    1. 登录实时计算管理控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > ETL

    4. 单击image后,单击新建流作业,填写文件名称并选择引擎版本

      Flink也为您提供了丰富的代码模板和数据同步,每种代码模板都为您提供了具体的使用场景、代码示例和使用指导。您可以直接单击对应的模板快速地了解Flink产品功能和相关语法,实现您的业务逻辑,详情请参见代码模板数据同步模板

      作业参数

      说明

      示例

      文件名称

      作业的名称。

      说明

      作业名称在当前项目中必须保持唯一。

      flink-test

      引擎版本

      当前作业使用的Flink引擎版本。

      建议使用带有推荐稳定标签的版本,这些版本具有更高的可靠性和性能表现,引擎版本详情请参见功能发布记录引擎版本介绍

      vvr-11.5-jdk11-flink-1.20

    5. 单击创建

    6. 编写SQL作业。

      将以下作业代码拷贝到作业文本编辑区,然后根据实际配置,修改参数配置信息。

      CREATE TEMPORARY TABLE source (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        event_time TIMESTAMP
      ) WITH (
        'connector' = 'faker',
        'number-of-rows' = '100',
        'rows-per-second' = '10',
        'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}',
        'fields.first_name.expression' = '#{name.firstName}',
        'fields.last_name.expression' = '#{name.lastName}',
        'fields.address.country.expression' = '#{Address.country}',
        'fields.address.state.expression' = '#{Address.state}',
        'fields.address.city.expression' = '#{Address.city}',
        'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}'
      );
      
      CREATE TEMPORARY TABLE sink (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        `timestamp` TIMESTAMP METADATA
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
        'topic' = 'users',
        'format' = 'json',
        'properties.enable.idempotence'='false'
      );
      
      INSERT INTO sink SELECT * FROM source;

      需要修改的参数配置信息如下:

      参数

      示例值

      说明

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

      Kafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在实例详情页面的接入点信息区域获取网络类型为VPC域名接入点作为该参数的值。

      topic

      users

      Kafka Topic名称。

  3. 启动作业。

    1. 数据开发 > ETL页面,单击部署

    2. 部署新版本对话框中,单击确定

    3. 配置作业资源,资源设置填写详情请参见配置作业资源

    4. 运维中心 > 作业运维页面,单击目标作业名称操作列中的启动。关于作业启动的配置说明,请参见作业启动

    5. 您可以在作业运维页面观察作业的运行信息和状态。image

      由于faker数据源是一个有限流,因此在作业处于运行状态后,大约1分钟左右后,作业就会处于完成状态。当作业结束运行代表作业已经将相关的数据写入到Kafkausers中。其中,写入到消息队列KafkaJSON数据格式大致如下。

      {
        "id": 765,
        "first_name": "Barry",
        "last_name": "Pollich",
        "address": {
          "country": "United Arab Emirates",
          "state": "Nevada",
          "city": "Powlowskifurt"
        }
      }

步骤三:创建并启动数据摄入作业

  1. 登录实时计算开发控制台,创建数据摄入作业。

    1. 登录实时计算管理控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > 数据摄入

    4. 单击image后,单击新建数据摄入草稿,填写文件名称并选择引擎版本

      作业参数

      说明

      示例

      文件名称

      作业的名称。

      说明

      作业名称在当前项目中必须保持唯一。

      flink-test

      引擎版本

      当前作业使用的Flink引擎版本。

      建议使用带有推荐稳定标签的版本,这些版本具有更高的可靠性和性能表现,引擎版本详情请参见功能发布记录引擎版本介绍

      vvr-11.5-jdk11-flink-1.20

    5. 单击创建

  2. 编写数据摄入作业。将以下作业代码拷贝到作业文本编辑区,然后根据实际配置,修改参数配置信息。

    假设Kafkatopic users中存有JSON格式的表数据,下面的作业可以将表的数据同步到DLFtest_database库的表users中。

    source:
      type: kafka
      name: Kafka Source
      properties.bootstrap.servers: alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092
      topic: users
      scan.startup.mode: earliest-offset
      value.format: json
      # (可选)递归式地展开JSON中的嵌套列
      json.infer-schema.flatten-nested-columns.enable: true
      # (可选)跳过前 100 次出现的解析异常;若超过 100 次则作业失败。
      ingestion.ignore-errors: true
      ingestion.error-tolerance.max-count: 100
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf
      #(可选)开启删除向量,提升读取性能
      table.properties.deletion-vectors.enabled: true
    
    transform:
      - source-table: \.*.\.*
        projection: \*
        primary-keys: id
        
    route:
      - source-table: users
        sink-table: test_database.users
    
    pipeline:
      # (可选)将会导致处理异常的脏数据记录到日志中
      dirty-data.collector:
        name: Logger Dirty Data Collector
        type: logger

    需要修改的参数配置信息如下:

    参数

    示例值

    说明

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

    Kafka Broker地址。

    格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在实例详情页面的接入点信息区域获取网络类型为VPC域名接入点作为该参数的值。

    topic

    users

    Kafka Topic名称。

    catalog.properties.uri

    http://cn-hangzhou-vpc.dlf.aliyuncs.com

    访问DLF Rest Catalog ServerURI,格式为http://[region-id]-vpc.dlf.aliyuncs.com。详见服务接入点中的Region ID。

    catalog.properties.warehouse

    dlf_test

    DLF Catalog名称。

    source-table

    users

    定义来自于上游哪个表,默认是topic名称。

    sink-table

    test_database.users

    定义写入下游哪张表,使用逗号连接Schema和表名。

  3. 单击保存

  4. 数据开发 > 数据摄入页面,单击部署

  5. 运维中心 > 作业运维页面,单击目标作业名称操作列中的启动关于作业启动的配置说明,请参见作业启动

    作业启动后,您可以在作业运维界面观察作业的运行信息和状态。image

步骤四:观察全量同步结果

  1. 登录数据湖构建控制台

  2. 选择作业目标表所在的Catalogs。

  3. 点击名称为test_database的数据库,查看users数据库中同步的users表结构和数据。

    同步后的表结构和数据如下图所示。

    • 表结构

      点击users表名称,查看表结构。

      image

    • 表数据

      • 在实时计算Flink版上添加Catalog,参见创建DLF Catalog

      • 作业运维 > Session管理页面,单击创建Session集群

      • 数据开发 > 数据查询页面,单击新建查询脚本

      在查询脚本中填写如下内容:

      SELECT * FROM `catalog_name`.test_database.users;

      点击运行,将作业指定提交到刚才创建的Session集群中:

      image

      表数据结果如下图所示。

      image

步骤五:观察自动同步表结构变更

  1. Kafka控制台手动发送一条包含新增列的消息。

    1. 登录云消息队列 Kafka 版控制台

    2. 实例列表页面,单击目标实例名称。

    3. Topic管理页面,单击目标Topic名称users。

    4. 单击体验发送消息

    5. 填写消息内容。

      消息内容

      配置项

      示例

      发送方式

      选中控制台

      消息Key

      填写为flinktest。

      消息内容

      将以下JSON内容复制粘贴到消息内容中。

      {
        "id": 100001,
        "first_name": "Dennise",
        "last_name": "Schuppe",
        "address": {
          "country": "Isle of Man",
          "state": "Montana",
          "city": "East Coleburgh"
        },
        "house-points": {
          "house": "Pukwudgie",
          "points": 76
        }
      }
      说明

      该示例中house-points是一个新增的嵌套列。

      发送到指定分区

      选中

      分区ID

      填写为0。

    6. 单击确定

  2. DLFFlink实时计算控制台,查看users表结构和数据的变化。

    1. 登录数据湖构建控制台

    2. 点击名称为test_database的数据库,查看users数据库中同步的users表结构和数据。

      image

    3. 在实时计算平台数据查询页面执行如下查询:

    4. SELECT * FROM `catalog_name`.test_database.users;

      image

    5. 查看表数据结果。

      表数据结果如下图所示。

      image

      可以观察到id100001的数据已经成功地写入到了DLF目标表中。同时,DLF中的test_database.users表多了house-points.househouse-points.points 两列。

      说明

      虽然插入到Kafka中的数据只有一个嵌套列house-points,但是由于在users表的WITH参数内声明要求json.infer-schema.flatten-nested-columns.enable,那么Flink 就会自动展平新增的嵌套列,并用访问该列的路径作为展开后的列的名字。

相关文档