日志实时入仓快速入门

更新时间:2025-03-31 06:03:58

本文为您介绍如何通过实时计算控制台快速构建从KafkaHologres的数据同步作业,实现日志数据的实时入仓部署。

前提条件

步骤一:配置IP白名单

为了让Flink能访问KafkaHologres实例,您需要将Flink工作空间的网段添加到KafkaHologres的白名单中。

  1. 获取Flink工作空间的VPC网段。

    1. 登录实时计算控制台

    2. 在目标工作空间右侧操作列,选择更多 > 工作空间详情

    3. 工作空间详情对话框,查看虚拟交换机的网段信息。

      网段信息

  2. 在消息队列KafkaIP白名单中,添加Flink工作空间的网段信息。

    您需要为网络类型为VPC的接入点配置白名单,操作步骤请参见配置白名单Kafka白名单

  3. HologresIP白名单中,添加Flink工作空间的网段信息。

    登录Hologres实例后配置IP白名单,操作步骤请参见IP白名单Holo白名单

步骤二:准备Kafka测试数据

使用实时计算Flink版的模拟数据生成源表作为数据生成器,将数据写入到Kafka中。请按以下步骤使用实时计算开发控制台将数据写入至消息队列Kafka。

  1. Kafka控制台创建一个名称为usersTopic。

    操作详情请参见步骤一:创建Topic

  2. 创建将数据写入到Kafka的作业。

    1. 登录实时计算管理控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > ETL,单击新建

    4. 新建作业草稿对话框,选择目标模板(例如:选择空白的流作业草稿),完成后单击下一步,填写作业配置信息。

      作业参数

      示例

      说明

      作业参数

      示例

      说明

      文件名称

      kafka-data-input

      作业的名称。

      说明

      作业名称在当前项目中必须保持唯一。

      存储位置

      作业草稿

      指定该作业的代码文件所属的文件夹。默认存放在作业草稿目录。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

      引擎版本

      vvr-8.0.11-flink-1.17

      在引擎版本下拉列表中选择目标引擎版本。

    5. 单击创建

    6. 编写SQL作业。

      将以下作业代码拷贝到作业文本编辑区,然后根据实际配置,修改参数配置信息。

      CREATE TEMPORARY TABLE source (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        event_time TIMESTAMP
      ) WITH (
        'connector' = 'faker',
        'number-of-rows' = '100',
        'rows-per-second' = '10',
        'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}',
        'fields.first_name.expression' = '#{name.firstName}',
        'fields.last_name.expression' = '#{name.lastName}',
        'fields.address.country.expression' = '#{Address.country}',
        'fields.address.state.expression' = '#{Address.state}',
        'fields.address.city.expression' = '#{Address.city}',
        'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}'
      );
      
      CREATE TEMPORARY TABLE sink (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        `timestamp` TIMESTAMP METADATA
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
        'topic' = 'users',
        'format' = 'json'
      );
      
      INSERT INTO sink SELECT * FROM source;

      需要修改的参数配置信息如下:

      参数

      示例值

      说明

      参数

      示例值

      说明

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

      Kafka Broker地址。

      格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在实例详情页面的接入点信息区域获取网络类型为VPC域名接入点作为该参数的值。

      topic

      users

      Kafka Topic名称。

  3. 启动作业。

    1. 数据开发 > ETL页面,单击部署

    2. 部署新版本对话框中,单击确定

    3. 配置作业资源,资源设置填写详情请参见配置作业资源

    4. 运维中心 > 作业运维页面,单击目标作业名称操作列中的启动。关于作业启动的配置说明,请参见作业启动

    5. 您可以在作业运维页面观察作业的运行信息和状态。image

      由于faker数据源是一个有限流,因此在作业处于运行状态后,大约1分钟左右后,作业就会处于完成状态。当作业结束运行代表作业已经将相关的数据写入到Kafkausers中。其中,写入到消息队列KafkaJSON数据格式大致如下。

      {
        "id": 765,
        "first_name": "Barry",
        "last_name": "Pollich",
        "address": {
          "country": "United Arab Emirates",
          "state": "Nevada",
          "city": "Powlowskifurt"
        }
      }

步骤三:创建Hologres Catalog

单表同步都需要依赖目标Catalog来创建目标表。因此,您需要通过控制台创建目标Catalog。本文将以目标CatalogHologres Catalog为例,为您进行介绍。关键配置信息如下,具体操作步骤请参见创建Hologres Catalog

配置项

说明

配置项

说明

catalog name

填写holo

endpoint

Hologres实例详情页面获取网络类型为指定VPC的域名信息。

username

阿里云账号的AccessKey。

password

阿里云账号的AccessSecret。

dbname

填写flink_test_db

重要

您需要在您的目标Hologres实例中已创建flink_test_db数据库,否则创建Catalog会报错。具体操作请参见创建数据库

步骤四:创建并启动数据同步作业

  1. 登录实时计算开发控制台,创建数据同步作业。

    1. 登录实时计算管理控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据开发 > ETL,单击新建

    4. 新建作业草稿对话框,选择目标模板(例如:选择空白的流作业草稿),完成后单击下一步,填写作业配置信息。

      作业参数

      示例

      说明

      作业参数

      示例

      说明

      文件名称

      flink-quickstart-test

      作业的名称。

      说明

      作业名称在当前项目中必须保持唯一。

      存储位置

      作业草稿

      指定该作业的代码文件所属的文件夹。默认存放在作业草稿目录。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

      引擎版本

      vvr-8.0.11-flink-1.17

      在引擎版本下拉列表中选择目标引擎版本。

    5. 单击创建

  2. 编写SQL作业。将以下作业代码拷贝到作业文本编辑区,然后根据实际配置,修改参数配置信息。

    将消息队列Kafka中名称为usersTopic数据同步至Hologresflink_test_db数据库的sync_kafka_users表中。您可以通过以下任意一种方式完成数据同步。

    通过CATS语句同步
    通过INSERT INTO语句同步

    该方式无需您手动在Hologres中创建sync_kafka_users表,也无需指明对应的列类型为JSONJSONB。

    CREATE TEMPORARY TABLE kafka_users (
      `id` INT NOT NULL,
      `address` STRING,
      `offset` BIGINT NOT NULL METADATA,
      `partition` BIGINT NOT NULL METADATA,
      `timestamp` TIMESTAMP METADATA,
      `date` AS CAST(`timestamp` AS DATE),
      `country` AS JSON_VALUE(`address`, '$.country'),
      PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
      'topic' = 'users',
      'format' = 'json',
      'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自动展开嵌套列。
      'scan.startup.mode' = 'earliest-offset'
    );
    
    CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users
    WITH (
      'connector' = 'hologres'
    ) AS TABLE kafka_users;
    说明

    为了避免作业Failover后,作业重启将重复数据写入到Hologres中,您可以添加相关主键从而唯一地标识数据。当数据重发时,Hologres将会保证相同partitionoffset的数据只会保留一份。

    需要修改的参数配置信息如下:

    参数

    示例值

    说明

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

    Kafka Broker地址。

    格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在实例详情页面的接入点信息区域获取网络类型为VPC域名接入点作为该参数的值。

    topic

    users

    Kafka Topic名称。

    考虑到Hologres中对于JSONJSONB类型的数据会进行特殊的优化,您也可以通过INSERT INTO语句将嵌套JSON写入到Hologres中。

    该方式需要您手动在Hologres中创建sync_kafka_users表,然后通过下文的SQL将数据写入到Hologres的表中。

    CREATE TEMPORARY TABLE kafka_users (
      `id` INT NOT NULL,
      `address` STRING, -- 该列对应的数据为嵌套JSON。
      `offset` BIGINT NOT NULL METADATA,
      `partition` BIGINT NOT NULL METADATA,
      `timestamp` TIMESTAMP METADATA,
      `date` AS CAST(`timestamp` AS DATE),
      `country` AS JSON_VALUE(`address`, '$.country')
    ) WITH (
      'connector' = 'kafka',
      'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092',
      'topic' = 'users',
      'format' = 'json',
      'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自动展开嵌套列。
      'scan.startup.mode' = 'earliest-offset'
    );
    
    CREATE TEMPORARY TABLE holo (
      `id` INT NOT NULL,
      `address` STRING,
      `offset` BIGINT,
      `partition` BIGINT,
      `timestamp` TIMESTAMP,
      `date` DATE,
      `country` STRING
    ) WITH (
      'connector' = 'hologres',
      'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80',
      'username' = '************************',
      'password' = '******************************',
      'dbname' = 'flink_test_db',
      'tablename' = 'sync_kafka_users'
    );
    
    INSERT INTO holo
    SELECT * FROM kafka_users;

    需要修改的参数配置信息如下:

    参数

    示例值

    说明

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9092,alikafka-host2.aliyuncs.com:9092,alikafka-host3.aliyuncs.com:9092

    Kafka Broker地址。

    格式为host:port,host:port,host:port,以英文逗号(,)分割。您可以在实例详情页面的接入点信息区域获取网络类型为VPC域名接入点作为该参数的值。

    topic

    users

    Kafka Topic名称。

    endpoint

    hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

    Hologres端点。

    格式为<ip>:<port>。您可以在Hologres实例详情页面获取网络类型为指定VPC域名信息作为该参数的值。

    username

    ************************

    Hologres用户名和密码,请填写阿里云账号的AccessKey IDAccessKey Secret。

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量管理

    password

    ******************************

    dbname

    flink_test_db

    Hologres数据库名称。

    tablename

    sync_kafka_users

    Hologres表名称。

    说明
    • 如果您通过INSERT INTO方式同步数据,则需要提前在目标实例的数据库中创建sync_kafka_users表和字段。

    • 如果Schema不为Public时,则tablename需要填写为schema.tableName。

  3. 单击保存

  4. 数据开发 > ETL页面,单击部署

  5. 运维中心 > 作业运维页面,单击目标作业名称操作列中的启动关于作业启动的配置说明,请参见作业启动

    作业启动后,您可以在作业运维界面观察作业的运行信息和状态。image

步骤五:观察全量同步结果

  1. 登录Hologres管理控制台

  2. 实例列表页面,单击目标实例名称。

  3. 在页面右上角,单击登录实例

  4. 元数据管理页签,查看users数据库中同步的sync_kafka_users表结构和数据。

    sync_kafka_users表

    同步后的表结构和数据如下图所示。

    • 表结构

      双击sync_kafka_users表名称,查看表结构。

      表结构

      说明

      在同步过程中,建议声明KafkaMetadata partitionoffset作为Hologres表中的主键。这样可以避免由于作业Failover,数据重发导致下游存储多份相同数据。

    • 表数据

      sync_kafka_users表信息页面右上角,单击查询表后,输入如下命令,单击运行

      SELECT * FROM public.sync_kafka_users order by partition, "offset";

      表数据结果如下图所示。表数据

步骤六:观察自动同步表结构变更

  1. Kafka控制台手动发送一条包含新增列的消息。

    1. 登录云消息队列 Kafka 版控制台

    2. 实例列表页面,单击目标实例名称。

    3. Topic管理页面,单击目标Topic名称users。

    4. 单击体验发送消息

    5. 填写消息内容。

      消息内容

      配置项

      示例

      配置项

      示例

      发送方式

      选中控制台

      消息Key

      填写为flinktest。

      消息内容

      将以下JSON内容复制粘贴到消息内容中。

      {
        "id": 100001,
        "first_name": "Dennise",
        "last_name": "Schuppe",
        "address": {
          "country": "Isle of Man",
          "state": "Montana",
          "city": "East Coleburgh"
        },
        "house-points": {
          "house": "Pukwudgie",
          "points": 76
        }
      }
      说明

      该示例中house-points是一个新增的嵌套列。

      发送到指定分区

      选中

      分区ID

      填写为0。

    6. 单击确定

  2. Hologres控制台,查看sync_kafka_users表结构和数据的变化。

    1. 登录Hologres管理控制台

    2. 实例列表页面,单击目标实例名称。

    3. 在页面右上角,单击登录实例

    4. 元数据管理页签,双击sync_kafka_users表名称。

    5. 单击查询表后,输入如下命令,单击运行

      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. 查看表数据结果。

      表数据结果如下图所示。Hologres表结果

      可以观察到id100001的数据已经成功地写入到了Hologres中。同时,Hologres中多了house-points.househouse-points.points 两列。

      说明

      虽然插入到Kafka中的数据只有一个嵌套列house-points,但是由于在kafka_users表的WITH参数内声明要求json.infer-schema.flatten-nested-columns.enable,那么Flink 就会自动展平新增的嵌套列,并用访问该列的路径作为展开后的列的名字。

相关文档

  • 本页导读 (1)
  • 前提条件
  • 步骤一:配置IP白名单
  • 步骤二:准备Kafka测试数据
  • 步骤三:创建Hologres Catalog
  • 步骤四:创建并启动数据同步作业
  • 步骤五:观察全量同步结果
  • 步骤六:观察自动同步表结构变更
  • 相关文档