EMR-3.38.3及后续版本的DataFlow集群以及安装了Flink组件的EMR自定义集群,可以使用数据湖元数据DLF(Data Lake Formation)作为元数据服务读取集群中的数据。本文为您介绍DataFlow集群如何通过Flink的Hive Catalog连接DLF,并读取Hive全量数据。
前提条件
使用限制
- DataFlow集群和DataLake集群需要在同一VPC下。
- 创建的DataFlow集群需要为EMR-3.38.3后续版本。
操作流程
步骤一:数据准备
- 下载Hive作业需要的测试数据至OSS对应的目录。
例如,上传目录为 oss://<yourBucketName>/hive/userdata/,其中<yourBucketName>为您在OSS控制台上创建的Bucket名称。上传文件详细信息,请参见 上传文件。
- 在DLF控制台创建元数据库,详情请参见创建元数据库。
例如,创建的元数据库名称为flink_dlf_hive,选择路径为 oss://<yourBucketName>/flink_dlf_hive/db。
- 在DataLake集群中,查看已经创建的元数据库。
- 执行以下命令,创建Hive的外表。
USE flink_dlf_hive; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; set hive.stats.autogather=false; DROP TABLE IF EXISTS emrusers; CREATE EXTERNAL TABLE emrusers ( userid INT, movieid INT, rating INT, unixtime STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION 'oss://<yourBucketName>/flink_dlf_hive/userdata/';
说明 请替换命令中的<yourBucketName>
为您实际在OSS控制台上创建的Bucket名称。 - 在DLF控制台上,验证表信息。
- 登录数据湖构建控制台。
- 在左侧导航栏,选择数据表。 ,单击
- 在数据表页面,通过库名过滤,可以查看已创建的表信息。
- 可选:在Hive命令行中,查询数据。
- 示例1
SELECT userid,movieid,rating,unix_timestamp() from emrusers limit 10;
- 示例2
SELECT movieid,count(userid) as usercount from emrusers group by movieid order by usercount desc limit 50;
- 示例1
步骤二:DataFlow集群连接DLF
- 通过SSH方式登录DataFlow集群,详情请参见登录集群。
- 执行以下命令,新建test目录。
mkdir /root/test
- 上传Hive配置文件到DataFlow集群的新建路径下。
您可以执行以下命令,复制DataLake集群中的 hive-site.xml文件到DataFlow集群。
scp root@<master-1-1节点内网的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /root/test/
说明 命令中的<master-1-1节点内网的IP地址>
,您可以在EMR控制台的集群管理的 节点管理页面查看, /root/test/为DataFlow集群的路径,您可以根据实际情况修改。 - 加载集群中内置的Hive Connector,启动SQL客户端。
- 当您的集群是EMR-3.43.0之前版本:
- 在Maven仓库官网中下载依赖的JAR包,并上传至DataFlow集群中。本示例是上传到/root目录,下载的JAR包为Jackson Core、Jackson Databind和Jackson Annotations。具体版本请根据您实际情况下载,本示例下载的JAR包为jackson-core-2.12.1.jar、jackson-databind-2.12.1.jar和jackson-annotations-2.12.1.jar。
- 执行以下命令,启动SQL客户端。
sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar -j /root/jackson-core-2.12.1.jar -j /root/jackson-databind-2.12.1.jar -j /root/jackson-annotations-2.12.1.jar
- 当您的集群是EMR-3.43.0及之后版本:
执行以下命令,启动SQL客户端。
sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar
- 当您的集群是EMR-3.43.0之前版本:
- 在Flink SQL命令行中,创建Catalog。
CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'default-database' = 'flink_dlf_hive', 'hive-version' = '2.3.6', 'hive-conf-dir' = '/root/test', 'hadoop-conf-dir' = '/etc/taihao-apps/hdfs-conf/' );
涉及参数如下表。参数 描述 type 固定值为hive。 default-database 步骤一:数据准备中创建的数据库的名称。 本示例为flink_dlf_hive。
hive-version 固定值为2.3.6。 hive-conf-dir 前一步骤中复制的hive-site.xml所在的目录。 本示例为/root/test。
hadoop-conf-dir 固定值为/etc/taihao-apps/hdfs-conf/。 返回信息如下,表示创建成功。[INFO] Execute statement succeed.
- 在Flink SQL命令行中,查看DLF的数据库。
步骤三:验证读取Hive全量数据
- 通过SSH方式登录DataFlow集群,详情请参见登录集群。
- 启动Yarn Session。
yarn-session.sh --detached
- 加载集群中内置的Hive Connector,启动SQL客户端。
- 当您的集群是EMR-3.43.0之前版本,执行以下命令,启动SQL客户端:
sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar -j /root/jackson-core-2.12.1.jar -j /root/jackson-databind-2.12.1.jar -j /root/jackson-annotations-2.12.1.jar
- 当您的集群是EMR-3.43.0及之后版本,执行以下命令,启动SQL客户端:
sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar
- 当您的集群是EMR-3.43.0之前版本,执行以下命令,启动SQL客户端:
- 在Flink SQL命令行中进行如下操作。
- 查看作业。