数据湖格式Hudi

本文为您介绍数据湖格式Hudi源表的背景信息、使用限制、 DDL定义、WITH参数和代码示例。

说明

数据湖格式Hudi结果表文档请参见Hudi结果表

背景信息

  • 什么是Hudi

    Hudi的定义、特性及典型场景详情如下表所示。

    类别

    详情

    定义

    Apache Hudi是一种开源的数据湖表格式框架。Hudi基于对象存储或者HDFS组织文件布局,保证ACID,支持行级别的高效更新和删除,从而降低数据ETL开发门槛。同时该框架还支持自动管理及合并小文件,保持指定的文件大小,从而在处理数据插入和更新时,不会创建过多的小文件,引发查询端性能降低,避免手动监控和重写小文件的运维负担。结合Flink、Presto、Spark等计算引擎进行数据入湖和计算分析,常用来支持DB入湖加速、增量数据实时消费和数仓回填等需求。详情请参见Apache Hudi

    特性

    • 支持ACID:支持ACID语义,提供事务的线性隔离级别。

    • 支持UPSERT语义:UPSERT语义即就是INSERT和UPDATE两种语义的合并。在UPSERT语义时,如果记录不存在则插入;如果记录存在则更新。通过INSERT INTO语法可以大幅简化开发代码的复杂度,提升效率。

    • 支持Data Version:通过时间旅行(Time Travel)特性,提供任意时间点的数据版本历史,便于数据运维,提升数据质量。

    • 支持Schema Evolution:支持动态增加列,类型变更等Schema操作。

    典型场景

    • DB入湖加速

      相比昂贵且低效的传统批量加载和Merge,Hudi提供超大数据集的实时流式更新写入。通过实时的ETL,您可以直接将CDC(change data capture)数据写入数据湖,供下游业务使用。典型案例为采用Flink MySQL CDC Connector将RDBMS(MySQL)的Binlog写入Hudi表。

    • 增量ETL

      通过增量拉取的方式获取Hudi中的变更数据流,相对离线ETL调度,实时性更好且更轻量。典型场景是增量拉取在线服务数据到离线存储中,通过Flink引擎写入Hudi表,借助Presto或Spark引擎实现高效的OLAP分析。

    • 消息队列

      在小体量的数据场景下,Hudi也可以作为消息队列替代Kafka,简化应用开发架构。

    • 数仓回填(backfill)

      针对历史全量数据进行部分行、列的更新场景,通过数据湖极大减少计算资源消耗,提升了端到端的性能。典型案例是Hive场景下全量和增量的打宽。

  • 全托管Flink集成Hudi功能优势

    相比开源社区Hudi,全托管Flink平台集成Hudi具有的功能优势详情如下表所示。

    功能优势

    详情

    平台侧与Flink全托管集成,免运维

    Flink全托管内置Hudi Connector,降低运维复杂度,提供SLA保障。

    完善的数据连通性

    对接多个阿里云大数据计算分析引擎,数据与计算引擎解耦,可以在Flink、Spark、Presto或Hive间无缝流转。

    深度打磨DB入湖场景

    与Flink CDC Connector联动,降低开发门槛。

    提供企业级特性

    包括集成DLF统一元数据视图、自动且轻量化的表结构变更。

    内置阿里云OSS存储,低成本存储,弹性扩展

    数据以开放的Parquet、Avro格式存储在阿里云OSS,存储计算分离,资源灵活弹性扩展。

  • CDC数据同步导入示意图

    CDC数据保存了完整的数据库变更,您可以通过以下任意一种方式将数据导入Hudi:

    • 对接CDC格式,消费Kafka数据的同时导入Hudi。

      支持debezium-json、canal-json和maxwell-json三种格式,该方式优点是可扩展性强,缺点是需要依赖Kafka和Debezium数据同步工具。

    • 通过Flink-CDC-Connector直接对接DB的Binlog,将数据导入Hudi。

      该方式优点是使用轻量化组件,对工具依赖少。

    重要
    • 如果无法保证上游数据顺序,则需要指定write.precombine.field字段。

    • 在CDC场景下,需要开启changelog模式,即changelog.enabled设为true。

使用限制

  • 仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持Hudi Connector。

  • 不支持以Session模式提交作业。

  • 文件系统支持HDFS、阿里云OSS服务和OSS-HDFS服务。

DDL定义

CREATE TEMPORARY TABLE hudi_sink (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3)
) WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'oss.endpoint' = '<yourOSSEndpoint>',
  'path' = 'oss://<yourOSSBucket>/<自定义存储位置>',
  'accessKeyId' = '<yourAccessKeyId>',
  'accessKeySecret' = '<yourAccessKeySecret>',
  'read.streaming.enabled' = 'true'
);

WITH参数

  • 基础参数

    参数

    说明

    是否必选

    备注

    connector

    源表类型。

    固定值为hudi

    table.type

    表类型。

    参数取值如下:

    • COPY_ON_WRITE:使用Parquet列式存储,每次更新数据,创建一个新的base文件。

    • MERGE_ON_READ:使用Parquet列式和Avro行式存储,更新操作将会被写入delta日志文件,异步合并delta日志文件和Parquet列式文件生成新版本文件。

    path

    表存储路径。

    支持阿里云OSS和HDFS两种路径。例如oss://<bucket name>/tablehdfs://<ip>:<port>/table

    read.streaming.enabled

    是否开启流读。

    参数取值如下:

    • true:开启。

    • false(默认值):关闭。

    read.start-commit

    流读的起始位点。

    请按yyyyMMddHHmmss格式指定流读的起始位点,包括该指定时间点。

    oss.endpoint

    阿里云对象存储服务OSS的Endpoint。

    如果使用OSS作为存储,则必需填写。参数取值请详情参见访问域名和数据中心

    accessKeyId

    阿里云账号的AccessKey ID。

    如果使用OSS作为存储,则必需填写。获取方法请参见获取AccessKey

    accessKeySecret

    阿里云账号的AccessKey Secret。

    如果使用OSS作为存储,则必需填写。获取方法请参见获取AccessKey

    hive_sync.enable

    是否开启同步元数据到Hive功能。

    参数取值如下:

    • true:开启。

    • false:不开启。

    hive_sync.mode

    Hive数据同步模式。

    参数取值如下:

    • hms(推荐值):采用Hive Metastore或者DLF Catalog时,需要设置为hms。

    • jdbc(默认值):采用jdbc Catalog时,需要设置为jdbc。

    hive_sync.db

    同步到Hive的数据库名称。

    无。

    hive_sync.table

    同步到Hive的表名称。

    无。

    dlf.catalog.region

    DLF服务的地域名。

    详情请参见已开通的地域和访问域名

    重要

    请和dlf.catalog.endpoint选择的地域保持一致。

    dlf.catalog.endpoint

    DLF服务的Endpoint。

    详情请参见已开通的地域和访问域名

    说明
    • 推荐您为dlf.catalog.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com。

    • 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?

    write.precombine.field

    版本字段,基于此字段的大小来判断消息是否进行更新。

    默认字段名称为ts,如果没有设置该参数,则系统默认会按照消息在引擎内部处理的先后顺序进行更新。

    changelog.enabled

    是否开启Changelog模式。

    参数取值如下:

    • true:开启Changelog模式。

      Hudi支持保留消息的所有变更,对接Flink引擎的后,实现全链路近实时数仓生产。Hudi的MOR表以行存格式保留消息的所有变更,通过流读MOR表可以消费到所有的变更记录。此时,您需要开启Changelog模式,changelog.enabled设置为true。

      开启changelog.enabled参数后,支持消费所有变更。异步的合并任务会将中间变更合并成1条。所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的buffer时间可以预留一定的时间buffer给reader。例如调整合并的compaction.delta_commits和compaction.delta_seconds参数。

    • false(默认值):关闭Changelog模式。关闭Changelog模式时,即支持UPSERT语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被合并。

    说明

    流读取会展示每次变更,批读只会展示合并后的变更结构。

示例

本示例为您介绍如何通过datagen connector随机生成流式数据写入Hudi表,然后从Hudi表流式读取数据写入blackhole。

  1. 创建OSS Bucket。

    详情请参见控制台创建存储空间

  2. 作业开发页面,在目标作业文本编辑区域,编写SQL设计流作业。

    CREATE TEMPORARY TABLE datagen(
      id INT NOT NULL PRIMARY KEY NOT ENFORCED,
      data  STRING,
      ts TIMESTAMP(3)
    ) WITH (
      'connector' = 'datagen' ,
      'rows-per-second'='100' 
    );
    
    CREATE TEMPORARY TABLE datasink (
      id INT NOT NULL PRIMARY KEY NOT ENFORCED,
      data STRING,
      ts TIMESTAMP(3)
    ) WITH (
      'connector' = 'blackhole'      
    );
    
    CREATE TEMPORARY TABLE hudi (
      id INT NOT NULL PRIMARY KEY NOT ENFORCED,
      data STRING,
      ts TIMESTAMP(3)
    ) WITH (
      'connector' = 'hudi',                         
      'oss.endpoint' = '<yourOSSEndpoint>',                         
      'accessKeyId' = '<yourAccessKeyId>',           
      'accessKeySecret' = '<yourAccessKeySecret>',      
      'path' = 'oss://<yourOSSBucket>/<自定义存储位置>',
      'table.type' = 'MERGE_ON_READ',     
      'read.streaming.enabled' = 'true'
    );
    
    BEGIN STATEMENT SET;
    INSERT INTO hudi SELECT * from datagen;
    INSERT INTO datasink SELECT * FROM hudi;
    END;
    说明

    您也可以通过两个作业来进行流式读写操作。本示例代码中,我们将两个Sink语句写在了一个作业里。写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾,详情请参见INSERT INTO语句

  3. 在作业开发页面右侧高级配置面版中,引擎版本配置为vvr-4.0.11-flink-1.13。引擎版本

  4. 单击验证

  5. 单击上线

  6. 作业运维页面,单击目标作业名称操作列的启动

  7. 在OSS控制台查看写入的测试数据。

    等第一次Checkpoint完成之后,您将能看到写入的测试数据了。

阿里云首页 实时计算 Flink版 相关技术圈