DataFlow集群通过Hive Catalog连接数据湖元数据DLF

EMR-3.38.3及后续版本的EMR集群可以使用数据湖元数据DLF(Data Lake Formation)服务对集群数据进行统一管理,EMR中的Flink组件在开源Flink基础上增加了与DLF适配的功能。本文为您介绍如何在EMR集群上通过Flink SQL创建Hive Catalog连接到DLF,并读取Hive全量数据。

前提条件

  • 已在E-MapReduce控制台上创建DataFlow集群和DataLake集群,详情请参见创建集群

    重要

    创建DataLake集群时,元数据需为DLF 统一元数据

  • 已开通数据湖构建DLF,详情请参见快速入门

使用限制

  • DataFlow集群和DataLake集群需要在同一VPC下。

  • 创建的DataFlow集群需要为EMR-3.38.3后续版本。

操作流程

  1. 步骤一:数据准备

  2. 步骤二:DataFlow集群连接DLF读取Hive全量数据

步骤一:数据准备

  1. 下载Hive作业需要的测试数据至OSS任意空目录,数据上传目录将作为后续的外表地址使用。

    本示例中上传目录为oss://<yourBucketName>/hive/userdata/,其中<yourBucketName>为您在OSS控制台上创建的Bucket名称。上传文件详细信息,请参见控制台上传文件

  2. 在DLF控制台创建元数据库,详情请参见创建元数据库

    本示例中创建的元数据库名称为flink_dlf_hive,选择路径为oss://<yourBucketName>/flink_dlf_hive/db

  3. 在DataLake集群中,查看已经创建的元数据库。

    1. 通过SSH方式登录DataLake集群,详情请参见登录集群

    2. 执行以下命令,切换为hadoop用户并进入Hive命令行。

      su - hadoop
      hive
    3. 执行以下命令,查看库信息。

      desc database flink_dlf_hive;
      说明

      命令中的flink_dlf_hive为上一步骤中创建的数据库的名称。

      OK
      flink_dlf_hive          oss://aliyu****/flink_dlf_hive/db    acs:ram::125046002175****:user/29915368510086****       USER
      Time taken: 0.069 seconds, Fetched: 1 row(s)
  4. 创建Hive的外表并验证。

    1. 执行以下命令,创建Hive的外表。

       USE flink_dlf_hive;
       set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
       set hive.stats.autogather=false;
       DROP TABLE IF EXISTS emrusers;
       CREATE EXTERNAL TABLE emrusers (
         userid INT,
         movieid INT,
         rating INT,
         unixtime STRING )
        ROW FORMAT DELIMITED
        FIELDS TERMINATED BY '\t'
        STORED AS TEXTFILE
        LOCATION 'oss://<yourBucketName>/<yourTableDir>/';
      说明

      请替换命令中的<yourBucketName>为您实际在OSS控制台上创建的Bucket名称,<yourTableDir>为您的数据实际存储目标,本示例中的地址为oss://<yourBucketName>/hive/userdata/

    2. 在Hive命令行中,查询数据进行验证。

      • 示例1

        SELECT userid,movieid,rating,unix_timestamp() from emrusers limit 10;
      • 示例2

        SELECT movieid,count(userid) as usercount from emrusers group by movieid order by usercount desc limit 50;
  5. 可选:在DLF控制台上,验证表信息。

    1. 登录数据湖构建控制台

    2. 在左侧导航栏,选择元数据 > 元数据管理,单击数据表

    3. 数据表页面,通过库名过滤,可以查看已创建的表信息。

步骤二:DataFlow集群连接DLF读取Hive全量数据

  1. 通过SSH方式登录DataFlow集群,详情请参见登录集群

  2. 执行以下命令启动Yarn Session。

     yarn-session.sh --detached
  3. 上传Hive配置文件到DataFlow集群的新建路径下。

    您可以执行以下命令,复制DataLake集群中的hive-site.xml文件到DataFlow集群。

    scp root@<master-1-1节点内网的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /root/test/
    说明

    命令中的<master-1-1节点内网的IP地址>,您可以在EMR控制台的集群管理的节点管理页面查看,/root/test/为DataFlow集群的路径,您可以根据实际情况修改。

  4. 加载集群中内置的Hive Connector,启动SQL客户端。

    • 当您的集群是EMR-3.43.0之前版本:

      1. Maven仓库官网中下载依赖的JAR包,并上传至DataFlow集群中。本示例是上传到/root目录,下载的JAR包为Jackson CoreJackson DatabindJackson Annotations。具体版本请根据您实际情况下载,本示例下载的JAR包为jackson-core-2.12.1.jar、jackson-databind-2.12.1.jar和jackson-annotations-2.12.1.jar。

      2. 执行以下命令,启动SQL客户端。

        sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar -j /root/jackson-core-2.12.1.jar -j /root/jackson-databind-2.12.1.jar -j /root/jackson-annotations-2.12.1.jar
    • 当您的集群是EMR-3.43.0及之后版本:

      执行以下命令,启动SQL客户端。

      sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar
  5. 在Flink SQL命令行中,创建Catalog。

    CREATE CATALOG hive_catalog WITH (
         'type' = 'hive',
         'default-database' = 'flink_dlf_hive',
         'hive-version' = '2.3.6',
         'hive-conf-dir' = '/root/test',
         'hadoop-conf-dir' = '/etc/taihao-apps/hadoop-conf/'
     );

    涉及参数如下表。

    参数

    描述

    type

    固定值为hive

    default-database

    步骤一:数据准备中创建的数据库的名称。

    本示例为flink_dlf_hive。

    hive-version

    固定值为2.3.6

    hive-conf-dir

    前一步骤中复制的hive-site.xml所在的目录。

    本示例为/root/test。

    hadoop-conf-dir

    固定值为/etc/taihao-apps/hadoop-conf/

    返回信息如下,表示创建成功。

    [INFO] Execute statement succeed.
  6. 在Flink SQL命令行中,查看DLF的数据库。

    1. 执行以下命令,设置当前的Catalog为刚才创建的hive_catalog。

      USE CATALOG hive_catalog;
    2. 执行以下命令,显示当前Catalog下的数据库。

      SHOW DATABASES;
    3. 执行以下命令,设置当前的数据库,本示例中数据库为flink_dlf_hive。

      USE flink_dlf_hive;
    4. 执行以下命令,查看当前数据库中的表。

    5. SHOW TABLES;

      返回信息如下。

      +------------+
      | table name |
      +------------+
      |   emrusers |
      +------------+
      1 row in set
    6. 执行以下命令,查看表信息。

      desc emrusers;

      返回信息如下。

      +----------+--------+------+-----+--------+-----------+
      |     name |   type | null | key | extras | watermark |
      +----------+--------+------+-----+--------+-----------+
      |   userid |    INT | true |     |        |           |
      |  movieid |    INT | true |     |        |           |
      |   rating |    INT | true |     |        |           |
      | unixtime | STRING | true |     |        |           |
      +----------+--------+------+-----+--------+-----------+
      4 rows in set
  7. 验证读取Hive全量数据。

    1. 在Flink SQL客户端执行以下命令,创建表。

      create table default_catalog.default_database.datahole(userid int, movieid int, ts timestamp) with ('connector' = 'blackhole');
    2. 执行以下命令,读取Hive全量数据到blackhole。

      insert into `default_catalog`.`default_database`.`datahole` select userid, movieid, CURRENT_TIMESTAMP as ts from `hive_catalog`.`flink_dlf_hive`.`emrusers`;

      执行成功后,会返回已提交的Flink作业的Application ID与Job ID。返回如下类似信息。Application ID

    3. 通过Web UI查看作业状态,详情请参见通过Web UI查看作业状态

      单击目标作业的Application ID,可以查看作业运行的详情,单击Tracking URL所在行的链接,在左侧导航栏中,选择Jobs > Completed Jobs可以查看已完成的作业。Completed Jobs