EMR-3.38.3及后续版本的DataFlow集群,可以通过数据湖元数据DLF(Data Lake Formation)作为元数据读取DataLake集群或自定义集群中的数据。本文为您介绍Dataflow集群如何连接DLF,并读取Hudi全量数据。

前提条件

  • 已在E-MapReduce控制台上创建DataFlow集群和DataLake集群,详情请参见创建集群
    重要 创建DataLake集群时,元数据需为DLF统一元数据
  • 已开通数据湖构建DLF,详情请参见快速入门

使用限制

DataFlow集群和DataLake集群需要在同一VPC下。

操作流程

  1. 步骤一:环境准备
  2. 步骤二:启动Flink SQL
  3. 步骤三:创建并验证Catalog
  4. 步骤四:Flink SQL写入Hudi
  5. 步骤五:DataLake集群查询Hudi

步骤一:环境准备

拷贝DataLake集群中${HIVE_CONF_DIR}下的hive-site.xml到DataFlow集群。

例如,${HIVE_CONF_DIR}/etc/taihao-apps/hive-conf/

mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1节点内网的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/

步骤二:启动Flink SQL

重要
  • 务必将DLF的依赖包放置在Hive依赖包的前面,其中DLF依赖包中嵌入了Hudi的依赖。
  • 无需关注Datalake集群中的Hive版本,Hive依赖均使用2.3.6版本的。
执行以下命令,启动Flink SQL。
sql-client.sh \
-j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar \
-j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar
测试时可设置以下配置。
set sql-client.verbose=true;
set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;

步骤三:创建并验证Catalog

进入Flink SQL后,分别创建DLF Catalog和Hive Catalog用于读取Hudi表和Hive表。

  1. 执行以下命令,创建Catalog。
    • 创建DLF Catalog
      CREATE CATALOG dlf_catalog WITH (
           'type' = 'dlf',
           'access.key.id' = '<yourAccessKeyId>', --您阿里云账号的AccessKey ID。
           'access.key.secret' = '<yourAccessKeyId>', --您阿里云账号的AccessKey Secret。
           'warehouse' = 'oss://oss-bucket/warehouse/test.db',
           'oss.endpoint' = '<oss.endpoint>', --从${HADOOP_CONF_DIR}/core-site.xml中获取。
           'dlf.endpoint' = '<dlf.endpoint>', --从${HIVE_CONF_DIR}/hive-site.xml中获取。
           'dlf.region-id' = '<dlf.region-id>' --从${HIVE_CONF_DIR}/hive-site.xml中获取。
       );
    • 创建Hive Catalog
      重要 无需关注Datalake集群中的Hive版本,hive-version均使用2.3.6。
      CREATE CATALOG hive_catalog WITH (
           'type' = 'hive',
           'default-database' = 'default',
           'hive-version' = '2.3.6',
           'hive-conf-dir' = '/etc/taihao-apps/hive-conf/',
           'hadoop-conf-dir' = '/etc/taihao-apps/hadoop-conf/'
       );
    Catalog创建成功后,均会返回以下信息。
    [INFO] Execute statement succeed.
  2. 执行以下命令,验证Catalog。
    • 验证DLF Catalog
      select * from dlf_catalog.test.hudi_table;
    • 验证Hive Catalog
      select * from hive_catalog.test.hive_table;

步骤四:Flink SQL写入Hudi

  • 场景一:数据入湖
    使用Datagen Connector随机生成上游Source数据,入湖Hudi表。
    -- 构建上游Source数据
    CREATE TABLE datagen_source (
      uuid int,
      age int,
      ts bigint,
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10'
    );
    
    -- 创建Hudi表
    CREATE TABLE dlf_catalog.test.hudi_tbl1(
      id int not null,
      age int,
      ts bigint
    )
    with(
      'connector'='hudi',
      'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl1',
      'table.type'='COPY_ON_WRITE',
      'hoodie.datasource.write.recordkey.field'='id',
      'hive_sync.enable'='true',
      'hive_sync.table'='hudi_tbl1',    -- required, Hive新建的表名。
      'hive_sync.db'='test',            -- required, Hive新建的数据库名。
      'hive_sync.mode' = 'hms'          -- required, 将hive sync mode设置为hms, 默认jdbc。
    );
    
    --入湖
    insert into dlf_catalog.test.hudi_tbl1
    select uuid as id, age, ts
    from default_catalog.default_database.datagen_source;
    
    -- 查询验证
    select * from dlf_catalog.test.hudi_tbl1;
  • 场景二:维度打宽入湖

    使用ODS层的Hudi数据和Hive的维度表关联打宽填充维度字段,最后写入新的Hudi表。

    例如,已有表dlf_catalog.test.hive_dim_tbl,表结构如下。
    id                      int
    name                    string
    详细示例如下。
    -- 创建目标Hudi表
    CREATE TABLE dlf_catalog.test.hudi_tbl2(
      id int not null,
      name string,
      age int,
      ts bigint
    )
    with(
      'connector'='hudi',
      'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl2',
      'table.type'='COPY_ON_WRITE',
      'hoodie.datasource.write.recordkey.field'='id',
      'hive_sync.enable'='true',
      'hive_sync.table'='hudi_tbl2',    -- required, Hive新建的表名。
      'hive_sync.db'='test',            -- required, Hive新建的数据库名。
      'hive_sync.mode' = 'hms'          -- required, 将hive sync mode设置为hms, 默认jdbc。
    );
    
    -- 关联Hive维度表后入湖
    insert into dlf_catalog.test.hudi_tbl2
    select s.id, name, age, ts
    from dlf_catalog.test.hudi_tbl1 s join hive_catalog.test.hive_dim_tbl t on s.id = t.id;

步骤五:DataLake集群查询Hudi

登录DataLake集群查询Hudi数据。登录集群详情请参见登录集群

  • Spark查询

    Spark查询详情,请参见Hudi与Spark SQL集成

    1. 执行以下命令,启动spark-sql。
      spark-sql \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
      如果您集群的Spark是Spark 3,且Hudi为0.11及以上版本,则需额外添加以下配置。
      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    2. 执行以下命令,验证表信息。
      • 查询hudi_tbl1
        select * from test.hudi_tbl1;
      • 查询hudi_tbl2
        select * from test.hudi_tbl2;
  • Hudi查询
    1. 执行以下命令,启动Hive CLI。
      hive
    2. 执行以下命令,验证表信息。
      • 查询hudi_tbl1
        select * from test.hudi_tbl1;
      • 查询hudi_tbl2
        select * from test.hudi_tbl2;