DataFlow集群通过DLF读写Hudi表

更新时间:2025-03-04 02:10:18

DataFlow集群可以通过数据湖构建(DLF)的统一元数据服务,访问DataLake集群或自定义集群中的Hudi表数据。本文为您介绍DataFlow集群如何连接DLF并读取Hudi全量数据。

前提条件

  • 已在E-MapReduce控制台上创建DataFlow集群和DataLake集群,且在同一VPC下,详情请参见创建集群

    重要

    创建DataLake集群时,元数据需为DLF统一元数据

  • 已开通数据湖构建DLF,详情请参见快速入门

使用限制

DataFlow集群版本需为EMR-3.38.3及以上,且不超过EMR-3.50.xEMR-5.16.x。

操作流程

  1. 步骤一:环境准备

  2. 步骤二:启动Flink SQL

  3. 步骤三:创建Catalog

  4. 步骤四:Flink SQL写入Hudi

  5. 步骤五:DataLake集群查询Hudi

步骤一:环境准备

拷贝DataLake集群中${HIVE_CONF_DIR}下的hive-site.xmlDataFlow集群。

例如,${HIVE_CONF_DIR}/etc/taihao-apps/hive-conf/

mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1节点内网的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/

步骤二:启动Flink SQL

重要
  • 务必将DLF的依赖包放置在Hive依赖包的前面,其中DLF依赖包中嵌入了Hudi的依赖。

  • 无需关注DataLake集群中的Hive版本,Hive依赖均使用2.3.6版本的。

  1. 执行以下命令,启动Flink YARN会话。

    yarn-session.sh -d -qu default
  2. 执行以下命令,启动Flink SQL。

    sql-client.sh \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \
    -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar
    说明

    请根据实际情况替换上述JAR包中的版本号。

  3. 测试时需设置以下配置。

    -- 启用详细日志输出。
    set sql-client.verbose=true;
    -- 设置结果显示模式为表格形式。
    set sql-client.execution.result-mode=tableau;
    -- 设置checkpoint时间间隔为1秒,确保数据在checkpoint触发后才可见。主要用于步骤四中Source数据的生成。
    set execution.checkpointing.interval=1000;

步骤三:创建Catalog

进入Flink SQL后,执行以下命令,创建DLF Catalog,用于读取Hudi表。

CREATE CATALOG dlf_catalog WITH (
     'type' = 'dlf',
     'access.key.id' = '<yourAccessKeyId>', --您阿里云账号的AccessKey ID。
     'access.key.secret' = '<yourAccessKeySecret>', --您阿里云账号的AccessKey Secret。
     'warehouse' = 'oss://<bucket>/<object>', -- bucket:表示您创建的OSS Bucket名称。object:表示您存放数据的路径。您可以在OSS管理控制台上查看。
     'oss.endpoint' = '<oss.endpoint>', --从${HADOOP_CONF_DIR}/core-site.xml中获取fs.oss.endpoint的值。
     'dlf.endpoint' = '<dlf.endpoint>', --从/etc/taihao-apps/hive-conf/hive-site.xml中获取dlf.catalog.endpoint的值。
     'dlf.region-id' = '<dlf.region-id>' --从/etc/taihao-apps/hive-conf/hive-site.xml中获取dlf.catalog.region的值。
 );

Catalog创建成功后,会返回以下信息。

[INFO] Execute statement succeed.

步骤四:Flink SQL写入Hudi

使用Datagen Connector随机生成上游Source数据,入湖Hudi表。

-- 构建上游Source数据
CREATE TABLE datagen_source (
  uuid int,
  age int,
  ts bigint
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '10'
);

-- 创建Hudi库表
CREATE database dlf_catalog.testdb;
CREATE TABLE dlf_catalog.testdb.hudi_tbl1(
  id int NOT NULL,
  age int,
  ts bigint
)
WITH(
  'connector'='hudi',
  'path' = 'oss://<bucket>/<object>/testdb/hudi_tbl1',   -- oss://<bucket>/<object> 为创建dlf_catalog时的warehouse,testdb为创建的数据库名,hudi_tbl1为表名。
  'table.type'='COPY_ON_WRITE',
  'hoodie.datasource.write.recordkey.field'='id',
  'hive_sync.enable'='true',
  'hive_sync.table'='hudi_tbl1',    -- required, Hive新建的表名。
  'hive_sync.db'='testdb',            -- required, Hive新建的数据库名。
  'hive_sync.mode' = 'hms'          -- required, 将hive sync mode设置为hms, 默认jdbc。
);

--入湖
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;

-- 查询验证
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;

步骤五:DataLake集群查询Hudi

登录DataLake集群查询Hudi数据。登录集群详情请参见登录集群

  • Spark查询

    Spark查询详情,请参见HudiSpark SQL集成

    1. 执行以下命令,启动spark-sql。

      spark-sql \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

      如果您集群的SparkSpark3,且Hudi0.11及以上版本,则需额外添加以下配置。

      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    2. 执行以下命令,验证表信息。

      SELECT * FROM testdb.hudi_tbl1;
  • Hudi查询

    1. 执行以下命令,启动Hive CLI。

      hive
    2. 执行以下命令,验证表信息。

      SELECT * FROM testdb.hudi_tbl1;

  • 本页导读 (1)
  • 前提条件
  • 使用限制
  • 操作流程
  • 步骤一:环境准备
  • 步骤二:启动Flink SQL
  • 步骤三:创建Catalog
  • 步骤四:Flink SQL写入Hudi
  • 步骤五:DataLake集群查询Hudi
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等