EMR-3.38.3及后续版本的DataFlow集群以及安装了Flink组件的EMR自定义集群,可以使用数据湖元数据DLF(Data Lake Formation)作为元数据服务读取集群中的数据。本文为您介绍DataFlow集群如何通过Flink的Hive Catalog连接DLF,并读取Hive全量数据。

前提条件

  • 已在E-MapReduce控制台上创建DataFlow集群和DataLake集群,详情请参见创建集群
    重要 创建DataLake集群时,元数据需为 DLF 统一元数据
  • 已开通数据湖构建DLF,详情请参见快速入门

使用限制

  • DataFlow集群和DataLake集群需要在同一VPC下。
  • 创建的DataFlow集群需要为EMR-3.38.3后续版本。

操作流程

  1. 步骤一:数据准备
  2. 步骤二:DataFlow集群连接DLF
  3. 步骤三:验证读取Hive全量数据

步骤一:数据准备

  1. 下载Hive作业需要的测试数据至OSS对应的目录。
    例如,上传目录为 oss://<yourBucketName>/hive/userdata/,其中<yourBucketName>为您在OSS控制台上创建的Bucket名称。上传文件详细信息,请参见 上传文件
  2. 在DLF控制台创建元数据库,详情请参见创建元数据库
    例如,创建的元数据库名称为flink_dlf_hive,选择路径为 oss://<yourBucketName>/flink_dlf_hive/db
  3. 在DataLake集群中,查看已经创建的元数据库。
    1. 通过SSH方式登录DataLake集群,详情请参见登录集群
    2. 执行以下命令,切换为hadoop用户。
      su hadoop
    3. 执行以下命令,进入Hive命令行。
      hive
    4. 执行以下命令,查看库信息。
      desc database flink_dlf_hive;
      说明 命令中的 flink_dlf_hive为上一步骤中创建的数据库的名称。
      返回信息如下。
      OK
      flink_dlf_hive          oss://aliyu****/flink_dlf_hive/db    acs:ram::125046002175****:user/29915368510086****       USER
      Time taken: 0.069 seconds, Fetched: 1 row(s)
  4. 执行以下命令,创建Hive的外表。
     USE flink_dlf_hive;
     set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
     set hive.stats.autogather=false;
     DROP TABLE IF EXISTS emrusers;
     CREATE EXTERNAL TABLE emrusers (
       userid INT,
       movieid INT,
       rating INT,
       unixtime STRING )
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '\t'
      STORED AS TEXTFILE
      LOCATION 'oss://<yourBucketName>/flink_dlf_hive/userdata/';
    说明 请替换命令中的 <yourBucketName>为您实际在OSS控制台上创建的Bucket名称。
  5. 在DLF控制台上,验证表信息。
    1. 登录数据湖构建控制台
    2. 在左侧导航栏,选择元数据 > 元数据管理,单击数据表
    3. 数据表页面,通过库名过滤,可以查看已创建的表信息。
  6. 可选:在Hive命令行中,查询数据。
    • 示例1
      SELECT userid,movieid,rating,unix_timestamp() from emrusers limit 10;
    • 示例2
      SELECT movieid,count(userid) as usercount from emrusers group by movieid order by usercount desc limit 50;

步骤二:DataFlow集群连接DLF

  1. 通过SSH方式登录DataFlow集群,详情请参见登录集群
  2. 执行以下命令,新建test目录。
    mkdir /root/test
  3. 上传Hive配置文件到DataFlow集群的新建路径下。
    您可以执行以下命令,复制DataLake集群中的 hive-site.xml文件到DataFlow集群。
    scp root@<master-1-1节点内网的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /root/test/
    说明 命令中的 <master-1-1节点内网的IP地址>,您可以在EMR控制台的集群管理的 节点管理页面查看, /root/test/为DataFlow集群的路径,您可以根据实际情况修改。
  4. 加载集群中内置的Hive Connector,启动SQL客户端。
    • 当您的集群是EMR-3.43.0之前版本:
      1. Maven仓库官网中下载依赖的JAR包,并上传至DataFlow集群中。本示例是上传到/root目录,下载的JAR包为Jackson CoreJackson DatabindJackson Annotations。具体版本请根据您实际情况下载,本示例下载的JAR包为jackson-core-2.12.1.jar、jackson-databind-2.12.1.jar和jackson-annotations-2.12.1.jar。
      2. 执行以下命令,启动SQL客户端。
        sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar -j /root/jackson-core-2.12.1.jar -j /root/jackson-databind-2.12.1.jar -j /root/jackson-annotations-2.12.1.jar
    • 当您的集群是EMR-3.43.0及之后版本:
      执行以下命令,启动SQL客户端。
      sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar
  5. 在Flink SQL命令行中,创建Catalog。
    CREATE CATALOG hive_catalog WITH (
         'type' = 'hive',
         'default-database' = 'flink_dlf_hive',
         'hive-version' = '2.3.6',
         'hive-conf-dir' = '/root/test',
         'hadoop-conf-dir' = '/etc/taihao-apps/hdfs-conf/'
     );
    涉及参数如下表。
    参数 描述
    type 固定值为hive
    default-database 步骤一:数据准备中创建的数据库的名称。

    本示例为flink_dlf_hive。

    hive-version 固定值为2.3.6
    hive-conf-dir 前一步骤中复制的hive-site.xml所在的目录。

    本示例为/root/test。

    hadoop-conf-dir 固定值为/etc/taihao-apps/hdfs-conf/
    返回信息如下,表示创建成功。
    [INFO] Execute statement succeed.
  6. 在Flink SQL命令行中,查看DLF的数据库。
    1. 执行以下命令,设置当前的Catalog为刚才创建的hive_catalog。
      USE CATALOG hive_catalog;
    2. 执行以下命令,显示当前Catalog下的数据库。
      SHOW DATABASES;
    3. 执行以下命令,设置当前的数据库。
      USE flink_dlf_hive;
    4. 执行以下命令,查看当前数据库中的表。
      SHOW TABLES;
      返回信息如下。
      +------------+
      | table name |
      +------------+
      |   emrusers |
      +------------+
      1 row in set
    5. 执行以下命令,查看表信息。
      desc emrusers;
      返回信息如下。
      +----------+--------+------+-----+--------+-----------+
      |     name |   type | null | key | extras | watermark |
      +----------+--------+------+-----+--------+-----------+
      |   userid |    INT | true |     |        |           |
      |  movieid |    INT | true |     |        |           |
      |   rating |    INT | true |     |        |           |
      | unixtime | STRING | true |     |        |           |
      +----------+--------+------+-----+--------+-----------+
      4 rows in set

步骤三:验证读取Hive全量数据

  1. 通过SSH方式登录DataFlow集群,详情请参见登录集群
  2. 启动Yarn Session。
     yarn-session.sh --detached
  3. 加载集群中内置的Hive Connector,启动SQL客户端。
    • 当您的集群是EMR-3.43.0之前版本,执行以下命令,启动SQL客户端:
      sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar -j /root/jackson-core-2.12.1.jar -j /root/jackson-databind-2.12.1.jar -j /root/jackson-annotations-2.12.1.jar
    • 当您的集群是EMR-3.43.0及之后版本,执行以下命令,启动SQL客户端:
      sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar
  4. 在Flink SQL命令行中进行如下操作。
    1. 执行以下命令,创建Catalog。
      CREATE CATALOG hive_catalog WITH (
           'type' = 'hive',
           'default-database' = 'flink_dlf_hive',
           'hive-version' = '2.3.6',
           'hive-conf-dir' = '/root/test',
           'hadoop-conf-dir' = '/etc/taihao-apps/hdfs-conf/'
       );
    2. 执行以下命令,创建表。
      create table default_catalog.default_database.datahole(userid int, movieid int, ts timestamp) with ('connector' = 'blackhole');
    3. 执行以下命令,读取Hive全量数据到blackhole。
      insert into `default_catalog`.`default_database`.`datahole` select userid, movieid, CURRENT_TIMESTAMP as ts from `hive_catalog`.`flink_dlf_hive`.`emrusers`;
      执行成功后,会返回已提交的Flink作业的Application ID与Job ID。返回如下类似信息。 Application ID
  5. 查看作业。
    1. 通过Web UI查看作业状态,详情请参见通过Web UI查看作业状态
    2. 单击目标作业的Application ID,可以查看作业运行的详情。
    3. 单击Tracking URL所在行的链接。
    4. 在左侧导航栏中,选择Jobs > Completed Jobs
      可以查看已完成的作业。 Completed Jobs