EMR-3.38.3及后续版本的EMR集群可以使用数据湖元数据DLF(Data Lake Formation)服务对集群数据进行统一管理,EMR中的Flink组件在开源Flink基础上增加了与DLF适配的功能。本文为您介绍如何在EMR集群上通过Flink SQL创建Hive Catalog连接到DLF,并读取Hive全量数据。
前提条件
使用限制
DataFlow集群和DataLake集群需要在同一VPC下。
创建的DataFlow集群需要为EMR-3.38.3后续版本。
操作流程
步骤一:数据准备
下载Hive作业需要的测试数据至OSS任意空目录,数据上传目录将作为后续的外表地址使用。
本示例中上传目录为oss://<yourBucketName>/hive/userdata/,其中<yourBucketName>为您在OSS控制台上创建的Bucket名称。上传文件详细信息,请参见控制台上传文件。
在DLF控制台创建元数据库,详情请参见创建元数据库。
本示例中创建的元数据库名称为flink_dlf_hive,选择路径为oss://<yourBucketName>/flink_dlf_hive/db。
在DataLake集群中,查看已经创建的元数据库。
通过SSH方式登录DataLake集群,详情请参见登录集群。
执行以下命令,切换为hadoop用户并进入Hive命令行。
su - hadoop hive
执行以下命令,查看库信息。
desc database flink_dlf_hive;
说明命令中的
flink_dlf_hive
为上一步骤中创建的数据库的名称。OK flink_dlf_hive oss://aliyu****/flink_dlf_hive/db acs:ram::125046002175****:user/29915368510086**** USER Time taken: 0.069 seconds, Fetched: 1 row(s)
创建Hive的外表并验证。
执行以下命令,创建Hive的外表。
USE flink_dlf_hive; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; set hive.stats.autogather=false; DROP TABLE IF EXISTS emrusers; CREATE EXTERNAL TABLE emrusers ( userid INT, movieid INT, rating INT, unixtime STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION 'oss://<yourBucketName>/<yourTableDir>/';
说明请替换命令中的
<yourBucketName>
为您实际在OSS控制台上创建的Bucket名称,<yourTableDir>
为您的数据实际存储目标,本示例中的地址为oss://<yourBucketName>/hive/userdata/。在Hive命令行中,查询数据进行验证。
示例1
SELECT userid,movieid,rating,unix_timestamp() from emrusers limit 10;
示例2
SELECT movieid,count(userid) as usercount from emrusers group by movieid order by usercount desc limit 50;
可选:在DLF控制台上,验证表信息。
登录数据湖构建控制台。
在左侧导航栏,选择 ,单击数据表。
在数据表页面,通过库名过滤,可以查看已创建的表信息。
步骤二:DataFlow集群连接DLF读取Hive全量数据
通过SSH方式登录DataFlow集群,详情请参见登录集群。
执行以下命令启动Yarn Session。
yarn-session.sh --detached
上传Hive配置文件到DataFlow集群的新建路径下。
您可以执行以下命令,复制DataLake集群中的hive-site.xml文件到DataFlow集群。
scp root@<master-1-1节点内网的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /root/test/
说明命令中的
<master-1-1节点内网的IP地址>
,您可以在EMR控制台的集群管理的节点管理页面查看,/root/test/为DataFlow集群的路径,您可以根据实际情况修改。加载集群中内置的Hive Connector,启动SQL客户端。
当您的集群是EMR-3.43.0之前版本:
在Maven仓库官网中下载依赖的JAR包,并上传至DataFlow集群中。本示例是上传到/root目录,下载的JAR包为Jackson Core、Jackson Databind和Jackson Annotations。具体版本请根据您实际情况下载,本示例下载的JAR包为jackson-core-2.12.1.jar、jackson-databind-2.12.1.jar和jackson-annotations-2.12.1.jar。
执行以下命令,启动SQL客户端。
sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar -j /root/jackson-core-2.12.1.jar -j /root/jackson-databind-2.12.1.jar -j /root/jackson-annotations-2.12.1.jar
当您的集群是EMR-3.43.0及之后版本:
执行以下命令,启动SQL客户端。
sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar
在Flink SQL命令行中,创建Catalog。
CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'default-database' = 'flink_dlf_hive', 'hive-version' = '2.3.6', 'hive-conf-dir' = '/root/test', 'hadoop-conf-dir' = '/etc/taihao-apps/hadoop-conf/' );
涉及参数如下表。
参数
描述
type
固定值为hive。
default-database
步骤一:数据准备中创建的数据库的名称。
本示例为flink_dlf_hive。
hive-version
固定值为2.3.6。
hive-conf-dir
前一步骤中复制的hive-site.xml所在的目录。
本示例为/root/test。
hadoop-conf-dir
固定值为/etc/taihao-apps/hadoop-conf/。
返回信息如下,表示创建成功。
[INFO] Execute statement succeed.
在Flink SQL命令行中,查看DLF的数据库。
执行以下命令,设置当前的Catalog为刚才创建的hive_catalog。
USE CATALOG hive_catalog;
执行以下命令,显示当前Catalog下的数据库。
SHOW DATABASES;
执行以下命令,设置当前的数据库,本示例中数据库为flink_dlf_hive。
USE flink_dlf_hive;
执行以下命令,查看当前数据库中的表。
SHOW TABLES;
返回信息如下。
+------------+ | table name | +------------+ | emrusers | +------------+ 1 row in set
执行以下命令,查看表信息。
desc emrusers;
返回信息如下。
+----------+--------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +----------+--------+------+-----+--------+-----------+ | userid | INT | true | | | | | movieid | INT | true | | | | | rating | INT | true | | | | | unixtime | STRING | true | | | | +----------+--------+------+-----+--------+-----------+ 4 rows in set
验证读取Hive全量数据。
在Flink SQL客户端执行以下命令,创建表。
create table default_catalog.default_database.datahole(userid int, movieid int, ts timestamp) with ('connector' = 'blackhole');
执行以下命令,读取Hive全量数据到blackhole。
insert into `default_catalog`.`default_database`.`datahole` select userid, movieid, CURRENT_TIMESTAMP as ts from `hive_catalog`.`flink_dlf_hive`.`emrusers`;
执行成功后,会返回已提交的Flink作业的Application ID与Job ID。返回如下类似信息。
通过Web UI查看作业状态,详情请参见通过Web UI查看作业状态。
单击目标作业的Application ID,可以查看作业运行的详情,单击Tracking URL所在行的链接,在左侧导航栏中,选择可以查看已完成的作业。 。