本文为您介绍如何使用Hudi连接器。

背景信息

Apache Hudi是一种开源的数据湖表格式框架。Hudi基于对象存储或者HDFS组织文件布局,保证ACID,支持行级别的高效更新和删除,从而降低数据ETL开发门槛。同时该框架还支持自动管理及合并小文件,保持指定的文件大小,从而在处理数据插入和更新时,不会创建过多的小文件,引发查询端性能降低,避免手动监控和合并小文件的运维负担。详情请参见Apache Hudi

类别

详情

支持类型

源表和结果表

运行模式

流模式和批模式

数据格式

暂不支持

特有监控指标

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

  • 结果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

说明

指标含义详情,请参见监控指标说明

API种类

Datastream和SQL

是否支持更新或删除结果表数据

特色功能

类别

详情

Hudi的核心特性

  • 支持ACID:支持ACID语义,默认提供SNAPSHOT ISOLATION隔离级别。

  • 支持UPSERT语义:UPSERT语义是INSERT和UPDATE两种语义的合并。在UPSERT语义时,如果记录不存在则插入;如果记录存在则更新。通过INSERT INTO语法可以大幅简化开发代码的复杂度,提升效率。

  • 支持Data Version:通过时间旅行(Time Travel)特性,提供任意时间点的数据版本历史,便于数据运维,提升数据质量。

Hudi的典型场景

  • DB入湖加速

    相比昂贵且低效的传统批量加载和Merge,Hudi提供超大数据集的实时流式更新写入。通过实时的ETL,您可以直接将CDC(Change Data Capture)数据写入数据湖,供下游业务使用。典型案例为采用Flink MySQL CDC Connector将RDBMS(MySQL)的Binlog写入Hudi表。

  • 增量ETL

    通过增量拉取的方式获取Hudi中的变更数据流,相对离线ETL调度,实时性更好且更轻量。典型场景是增量拉取在线服务数据到离线存储中,通过Flink引擎写入Hudi表,借助Presto或Spark引擎实现高效的OLAP分析。

  • 消息队列

    在小体量的数据场景下,Hudi也可以作为消息队列替代Kafka,简化应用开发架构。

  • 数仓回填(backfill)

    针对历史全量数据进行部分行、列的更新场景,通过数据湖极大减少计算资源消耗,提升了端到端的性能。典型案例是Hive场景下全量和增量的打宽。

全托管Hudi优势

相比开源社区Hudi,全托管Flink平台集成Hudi具有的功能优势详情如下所示:

  • 平台侧与Flink全托管集成,免运维

    Flink全托管内置Hudi连接器,降低运维复杂度,提供SLA保障。

  • 完善的数据连通性

    对接多个阿里云大数据计算分析引擎,数据与计算引擎解耦,可以在Flink、Spark、Presto或Hive间无缝流转。

  • 深度打磨DB入湖场景

    与Flink CDC连接器联动,降低开发门槛。

  • 提供企业级特性

    包括集成DLF统一元数据视图、自动且轻量化的表结构变更。

  • 内置阿里云OSS存储,低成本存储,弹性扩展

    数据以开放的Parquet、Avro格式存储在阿里云OSS,存储计算分离,资源灵活弹性扩展。

使用限制

  • 仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持Hudi Connector。

  • 文件系统仅支持HDFS或阿里云OSS和OSS-HDFS服务。

  • 不支持以Session模式提交作业。

  • 不支持修改字段,如需修改,请在DLF控制台通过Spark SQL语句进行操作。

语法结构

CREATE TEMPORARY TABLE hudi_tbl (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3),
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'path' = 'oss://<yourOSSBucket>/<自定义存储位置>',
  ...
);

WITH参数

基础参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为hudi。

    path

    表存储路径。

    String

    支持阿里云OSS、HDFS和OSS-HDFS和三种路径。

    • OSS:路径格式为oss://<bucket>/<user-defined-dir>

    • HDFS:路径格式为hdfs://<user-defined-dir>

    • OSS-HDFS:路径格式为oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>

      说明

      仅Flink计算引擎VVR 8.0.3及以上版本支持该参数配置为OSS-HDFS路径。

    其中:

    • bucket:表示您创建的OSS Bucket名称。

    • user-defined-dir:表示数据存放路径。

    • oss-hdfs-endpoint:表示OSS-HDFS服务Endpoint。

      您可以在OSS实例概览页面的访问端口中查看HDFSEndpoint信息。

    hoodie.datasource.write.recordkey.field

    主键字段。

    String

    uuid

    • 支持通过PRIMARY KEY语法设置主键字段。

    • 支持使用英文逗号(,)分隔多个字段。

    precombine.field

    版本字段。

    String

    ts

    基于此字段的大小来判断消息是否进行更新。

    如果您没有设置该参数,则系统默认会按照消息在引擎内部处理的先后顺序进行更新。

    oss.endpoint

    阿里云对象存储服务OSS或者OSS-HDFS的Endpoint。

    String

    如果使用OSS或者OSS-HDFS作为存储,则必需填写。

    • 使用OSS时,参数取值详情请参见访问域名和数据中心

    • 使用OSS-HDFS时,您可以在OSS实例概览页面的访问端口中查看HDFS服务Endpoint信息。

    accessKeyId

    阿里云账号的AccessKey ID。

    String

    如果使用OSS或者OSS-HDFS作为存储,则必需填写。

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量和密钥管理

    accessKeySecret

    阿里云账号的AccessKey Secret。

    String

    如果使用OSS或者OSS-HDFS作为存储,则必需填写。

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量和密钥管理

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    read.streaming.enabled

    是否开启流读。

    boolean

    false

    参数取值如下:

    • true:开启流读。

    • false:不开启流读。

    read.start-commit

    读取起始位点。

    string

    不填

    参数取值如下:

    • yyyyMMddHHmmss:从指定时间点开始消费。

    • earliest:从最早位点开始消费。

    • 不填:从最新时间开始消费。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    write.operation

    写入操作模式。

    String

    UPSERT

    参数取值如下:

    • insert模式:数据追加写。

    • upsert模式:数据更新。

    • bulk_insert模式:数据批量追加写。

    hive_sync.enable

    是否开启同步元数据到Hive功能。

    boolean

    false

    参数取值如下:

    • true:开启同步元数据到Hive功能。

    • false:关闭同步元数据到Hive功能。

    hive_sync.mode

    Hive数据同步模式。

    String

    hms

    参数取值如下:

    • hms:元数据同步到Hive Metastore或者DLF时,需要设置为hms。

    • jdbc:元数据同步到jdbc时,需要设置为jdbc。

    hive_sync.db

    同步到Hive的数据库名称。

    String

    default

    无。

    hive_sync.table

    同步到Hive的表名称。

    String

    当前table名

    hudi同步到Hive的表名不能使用中划线( -)。

    dlf.catalog.region

    DLF服务的地域名。

    String

    详情请参见已开通的地域和访问域名

    说明
    • 仅当hive_sync.mode设置为hms时,dlf.catalog.region参数设置才生效。

    • 请和dlf.catalog.endpoint选择的地域保持一致。

    dlf.catalog.endpoint

    DLF服务的Endpoint。

    String

    详情请参见已开通的地域和访问域名

    说明
    • 仅当hive_sync.mode设置为hms时,dlf.catalog.endpoint参数设置才生效。

    • 推荐您为dlf.catalog.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com

    • 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?

高阶参数

Hudi支持丰富的写入和读取场景,不同场景的参数如下表所示。

并发参数

名称

说明

默认值

备注

write.tasks

writer的并发,每个writer顺序写1~N个buckets。

4

增加写任务的并发对小文件个数没影响

write.bucket_assign.tasks

bucket assigner的并发。

Flink并发度

增加写任务的并发同时增加了写任务的bucket数,也就是增加了小文件(小bucket)数。

write.index_bootstrap.tasks

Index bootstrap算子的并发。

Flink并发度

  • 只在index.bootstrap.enabled为true时生效。

  • 增加并发可以加快bootstrap阶段的效率,bootstrap阶段会阻塞checkpoint,因此需要设置多一些的checkpoint失败容忍次数。

read.tasks

流和批读算子的并发。

4

无。

compaction.tasks

online compaction算子的并发。

4

online compaction比较耗费资源,建议走offline compaction。

在线压缩参数

名称

说明

默认值

备注

compaction.schedule.enabled

是否阶段性生成压缩plan。

true

参数取值如下:

  • true:阶段性生成压缩plan。

  • false:不阶段性生成压缩plan。

说明

建议阶段性生成压缩plan,即使compaction.async.enabled关闭的情况下。

compaction.async.enabled

是否开启异步压缩。

true

参数取值如下:

  • true:开启

  • false:关闭

说明

通过关闭compaction.async.enabled参数可关闭在线压缩执行,但是调度compaction.schedule.enabled仍然建议开启,之后可通过离线异步压缩,执行阶段性生成的压缩plan。

compaction.tasks

压缩任务的并发数。

4

无。

compaction.trigger.strategy

压缩策略。

num_commits

支持以下压缩策略:

  • num_commits:根据commit个数周期性触发。

  • time_elapsed:根据时间间隔周期性触发。

  • num_and_time:同时满足commit个数和时间间隔。

  • num_or_time:满足commit个数或者时间间隔。

compaction.delta_commits

经过多少个commit触发压缩。

5

无。

compaction.delta_seconds

经过多少秒后触发压缩。

3600

单位为秒。

compaction.max_memory

用于压缩去重的hashmap的可用内存大小。

100 MB

资源够用时,建议调整到1 GB。

compaction.target_io

每个压缩plan的IO上限。

500 GB

无。

文件大小

文件参数控制了文件的大小,目前支持的参数详情如下表所示。

名称

说明

默认值

备注

hoodie.parquet.max.file.size

最大可写入的parquet文件大小。

超过可写入的parquet文件大小时,将写入到新的文件组。

120 * 1024 * 1024 byte

(120 MB)

单位是byte。

hoodie.parquet.small.file.limit

小文件的大小阈值,小于该参数的文件被认为是小文件。

104857600 byte(100 MB)

  • 单位是byte。

  • 在写入时,hudi会尝试先追加写已存小文件。

hoodie.copyonwrite.record.size.estimate

预估的record大小。

1024 byte(1 KB)

  • 单位为byte。

  • 如果没有显示指定,hudi会根据提交元数据动态估计record大小.

Hadoop参数

名称

说明

默认值

备注

hadoop.${you option key}

通过hadoop.前缀指定hadoop配置项。

支持同时指定多个hadoop配置项。

说明

从Hudi 0.12.0开始支持,针对跨集群提交执行的需求,可以通过DDL指定per-job级别的hadoop配置。

数据写入

Hudi支持丰富的写入方式,包括离线批量写入、流式写入等场景。支持丰富的数据类型,包括changelog以及log数据。同时支持不同的索引方案。

  • 离线批量写入

    针对存量数据导入Hudi的需求,如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成Hoodie表格式。

    名称

    说明

    默认值

    备注

    write.operation

    写操作类型。

    upsert

    参数取值如下:

    • upsert:插入更新

    • insert:插入

    • bulk_insert:批量写入

      说明
      • bulk_insert导入省去了avro的序列化以及数据的merge过程,没有去重操作,数据的唯一性需要自己来保证。

      • bulk_insert需要在Batch Execution Mode下执行,Batch模式默认会按照分区名称排序输入消息再写入Hoodie,避免file handle频繁切换导致性能下降。

    write.tasks

    bulk_insert写任务的并发。

    Flink的并发度

    bulk_insert写任务的并发通过参数write.tasks指定,并发的数量会影响到小文件的数量。

    理论上,bulk_insert写任务的并发数就是划分的bucket数,当每个bucket在写到文件大小上限(parquet 120 MB)时,会滚动到新句柄,所以最终的写文件数量大于等于bulk_insert写任务的并发。

    write.bulk_insert.shuffle_input

    是否将数据按照partition字段shuffle再通过write task写入。

    true

    从Hudi 0.11.0版本开始,开启该参数将减少小文件的数量,但是可能有数据倾斜风险。

    write.bulk_insert.sort_input

    是否将数据先按照partition字段排序再写入。

    true

    从Hudi 0.11.0版本开始支持,当一个write task写多个partition,开启可以减少小文件数量

    write.sort.memory

    sort算子的可用managed memory。

    128

    单位是MB。

  • Changelog模式

    该模式只有MOR表支持,在该模式下Hoodie会保留消息的所有变更(I/-U/U/D),之后再配合Flink引擎的有状态计算实现全链路近实时数仓生产增量计算。Hoodie的MOR表通过行存原生支持保留消息的所有变更(format层面的集成),通过Flink全托管流读单个MOR表可以消费到所有的变更记录。

    说明

    非changelog模式,流读单次的batch数据集会merge中间变更;批读(快照读)会合并所有的中间结果,不管中间状态是否已被写入,都将被忽略。

    名称

    说明

    默认值

    备注

    changelog.enabled

    是否消费所有变更。

    false

    参数取值如下:

    • true:支持消费所有变更。

    • false:不消费所有变更,即UPSERT语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被merge掉。

    说明

    开启changelog.enabled参数后,异步的压缩任务仍然会将中间变更合并成1条数据,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。但是,可以通过调整压缩的频率,预留一定的时间buffer给 reader,比如调整compaction.delta_commits:5和compaction.delta_seconds: 3600压缩参数。

  • Append模式(从Hudi 0.10.0版本开始支持)

    在该模式下:

    • MOR表会应用小文件策略:会追加写avro log文件。

    • COW表没有小文件策略:每次写入COW表直接写新的parquet文件。

Clustering策略

Hudi支持丰富的Clustering策略,从而优化INSERT模式下的小文件问题。

  • Inline Clustering(只有Copy On Write表支持该模式)

    名称

    说明

    默认值

    备注

    write.insert.cluster

    是否在写入时合并小文件。

    false

    参数取值如下:

    • true:在写入时,合并小文件。

    • false:在写入时,不合并小文件。

    说明

    COW表默认insert写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件,但不会去重,吞吐会受影响。

  • Async Clustering(从Huid 0.12.0版本开始支持)

    名称

    说明

    默认值

    备注

    clustering.schedule.enabled

    是否在写入时定时调度Clustering plan。

    false

    开启后周期性调度clustering plan。

    clustering.delta_commits

    经过多少个commits生成Clustering plan。

    4

    clustering.schedule.enabled为true时,生效。

    clustering.async.enabled

    是否异步执行Clustering plan。

    false

    开启后周期性异步执行,合并小文件。

    clustering.tasks

    Clustering task执行并发。

    4

    无。

    clustering.plan.strategy.target.file.max.bytes

    Clustering单文件目标大小。

    1024 * 1024 * 1024

    单位是byte。

    clustering.plan.strategy.small.file.limit

    Clustering小文件阈值。

    600

    小于该大小的文件才会参与clustering。

    clustering.plan.strategy.sort.columns

    Clustering排序字段。

    支持指定特殊的排序字段。

  • Clustering Plan Strategy

    名称

    说明

    默认值

    备注

    clustering.plan.partition.filter.mode

    Clustering分区过滤模式。

    NONE

    支持的模式如下:

    • NONE:不过滤分区,所有分区都用于聚合,即不做限制。

    • RECENT_DAYS:数据按分区时,合并最近N天的数据。

    • SELECTED_PARTITIONS:指定固定的分区。

    clustering.plan.strategy.daybased.lookback.partitions

    采用RECENT_DAYS模式下的目标分区天数。

    2

    仅当clustering.plan.partition.filter.mode取值为RECENT_DAYS时生效。

    clustering.plan.strategy.cluster.begin.partition

    指定开始分区,用于过滤分区。

    仅当clustering.plan.partition.filter.mode取值为SELECTED_PARTITIONS时有效。

    clustering.plan.strategy.cluster.end.partition

    指定结束分区,用于过滤分区。

    仅当clustering.plan.partition.filter.mode取值为SELECTED_PARTITIONS时有效。

    clustering.plan.strategy.partition.regex.pattern

    通过正则表达式指定目标分区。

    无。

    clustering.plan.strategy.partition.selected

    指定目标partitions。

    支持通过英文逗号(,)分割多个partition。

  • Bucket索引

    说明

    从Hudi 0.11.0版本开始支持以下表格中的参数。

    名称

    说明

    默认值

    备注

    index.type

    索引类型。

    FLINK_STATE

    参数取值如下:

    • FLINK_STATE:使用flink state索引。

    • BUCKET:使用bucket索引。

    当数据量比较大时(表的数据条目超过5 亿),flink state的存储开销可能成为瓶颈。bucket索引通过固定的hash策略,将相同key的数据分配到同一个fileGroup中,可以避免索引的存储和查询开销。bucket index和flink state索引对比有以下区别:

    • bucket index没有flink state的存储计算开销,性能较好。

    • bucket index无法扩buckets,state index则可以依据文件的大小动态增加文件个数。

    • bucket index不支持跨partition的变更(如果输入是cdc流则没有这个限制),state index没有限制。

    hoodie.bucket.index.hash.field

    bucket索引hash key字段。

    主键

    可以设置成主键的子集。

    hoodie.bucket.index.num.buckets

    bucket索引的bucket个数。

    4

    默认每个partition的bucket数,当前设置后则不可再变更。

数据读取

  • Hudi支持丰富的读取方案,包括批读、流读、增量拉取,同时支持消费、传播changelog,实现端到端增量ETL。

    • 流读

      当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过read.streaming.enabled参数开启流读模式,通过read.start-commit参数指定起始消费位置,支持指定earliest从最早消费。

      名称

      说明

      默认值

      备注

      read.streaming.enabled

      是否开启流读模式。

      false

      参数取值如下:

      • true:开启流读模式。

      • false:关闭流读模式。

      read.start-commit

      流读起始位点

      不填

      参数取值如下:

      • yyyyMMddHHmmss:从指定时间点开始消费。

      • earliest:从最早位点开始消费。

      • 不填:从最新时间开始消费。

      clean.retain_commits

      cleaner最多保留的历史commits数。

      30

      大于此数量的历史commits会被清理掉,changelog模式下,该参数可以控制changelog的保留时间,例如checkpoint周期为5分钟一次,默认最少保留150分钟的时间。

      重要
      • 仅从0.10.0开始支持流读changelog。开启changelog模式后,hudi会保留一段时间的changelog供下游consumer消费。

      • changelog有可能会被compaction合并掉,中间记录会消除,可能会影响计算结果。

    • 增量读取(从Hudi 0.10.0版本开始支持)

      支持通过Flink全托管DataStream方式增量消费、Batch增量消费和TimeTravel(Batch消费某个时间点的数据)。

      名称

      说明

      默认值

      备注

      read.start-commit

      指定起始消费位点。

      从最新位置commit

      请按yyyyMMddHHmmss格式指定流读的起始位点。

      区间为闭区间,即包含起始和结束。

      read.end-commit

      指定结束消费位点。

      从最新位置commit

代码示例

  • 源表

CREATE TEMPORARY TABLE blackhole (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'blackhole'      
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<自定义存储位置>',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true'
);

-- 从最新的commit流读写入blackhole。
INSERT INTO blackhole SELECT * from hudi_tbl;
  • 结果表

CREATE TEMPORARY TABLE datagen(
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data  STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'datagen' ,
  'rows-per-second'='100' 
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<自定义存储位置>',
  'table.type' = 'MERGE_ON_READ'
);

INSERT INTO hudi_tbl SELECT * from datagen;

Datastream API

重要

通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器设置方法

  • maven pom

    根据使用的VVR版本,指定Flink和Hudi版本。

    <properties>
      <maven.compiler.source>8</maven.compiler.source>
      <maven.compiler.target>8</maven.compiler.target>
      <flink.version>1.15.4</flink.version>
      <hudi.version>0.13.1</hudi.version>
    </properties>
    
    <dependencies>
      <!-- flink -->
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- hudi -->
      <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-flink1.15-bundle</artifactId>
        <version>${hudi.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- oss -->
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aliyun</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- dlf -->
      <dependency>
        <groupId>com.aliyun.datalake</groupId>
        <artifactId>metastore-client-hive2</artifactId>
        <version>0.2.14</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.5.1</version>
        <scope>provided</scope>
      </dependency>
    </dependencies>
    重要

    DLF使用的部分依赖与社区版本存在冲突,例如hive-commonhive-exec。如果您有本地测试DLF的需求,可以下载hive-commonhive-execJAR包,然后在IDEA手动导入。

  • 写入到Hudi

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.data.GenericRowData;
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.data.StringData;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.configuration.FlinkOptions;
    import org.apache.hudi.util.HoodiePipeline;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class FlinkHudiQuickStart {
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        String dbName = "test_db";
        String tableName = "test_tbl";
        String basePath = "oss://xxx";
    
        Map<String, String> options = new HashMap<>();
        // hudi conf
        options.put(FlinkOptions.PATH.key(), basePath);
        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
        options.put(FlinkOptions.DATABASE_NAME.key(), dbName);
        options.put(FlinkOptions.TABLE_NAME.key(), tableName);
        // oss conf
        options.put("hadoop.fs.oss.accessKeyId", "xxx");
        options.put("hadoop.fs.oss.accessKeySecret", "xxx");
        // 本地调试使用公网网端,例如oss-cn-hangzhou.aliyuncs.com;提交集群使用内网网端,例如oss-cn-hangzhou-internal.aliyuncs.com
        options.put("hadoop.fs.oss.endpoint", "xxx");
        options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS");
        options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
        // dlf conf
        options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // 可选择是否同步DLF
        options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
        options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName);
        options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
        options.put("hadoop.dlf.catalog.id", "xxx");
        options.put("hadoop.dlf.catalog.accessKeyId", "xxx");
        options.put("hadoop.dlf.catalog.accessKeySecret", "xxx");
        options.put("hadoop.dlf.catalog.region", "xxx");
        //  本地调试使用公网网端,例如dlf.cn-hangzhou.aliyuncs.com,提交集群使用内网网端,例如dlf-vpc.cn-hangzhou.aliyuncs.com
        options.put("hadoop.dlf.catalog.endpoint", "xxx");
        options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory");
    
        DataStream<RowData> dataStream = env.fromElements(
            GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22,
                StringData.fromString("1001"), StringData.fromString("p1")),
            GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32,
                StringData.fromString("1002"), StringData.fromString("p2"))
        );
    
        HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
            .column("uuid string")
            .column("name string")
            .column("age int")
            .column("ts string")
            .column("`partition` string")
            .pk("uuid")
            .partition("partition")
            .options(options);
    
        builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
        env.execute("Flink_Hudi_Quick_Start");
      }
    }

常见问题