EMR-3.38.3及后续版本的DataFlow集群,可以通过数据湖元数据DLF(Data Lake Formation)作为元数据读取DataLake集群或自定义集群中的数据。本文为您介绍Dataflow集群如何连接DLF,并读取Hudi全量数据。
前提条件
使用限制
DataFlow集群和DataLake集群需要在同一VPC下。
操作流程
步骤一:环境准备
拷贝DataLake集群中${HIVE_CONF_DIR}
下的hive-site.xml到DataFlow集群。
例如,${HIVE_CONF_DIR}
为/etc/taihao-apps/hive-conf/。
mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1节点内网的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/
步骤二:启动Flink SQL
- 务必将DLF的依赖包放置在Hive依赖包的前面,其中DLF依赖包中嵌入了Hudi的依赖。
- 无需关注Datalake集群中的Hive版本,Hive依赖均使用2.3.6版本的。
执行以下命令,启动Flink SQL。
sql-client.sh \
-j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar \
-j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar
测试时可设置以下配置。
set sql-client.verbose=true;
set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;
步骤三:创建并验证Catalog
进入Flink SQL后,分别创建DLF Catalog和Hive Catalog用于读取Hudi表和Hive表。
- 执行以下命令,创建Catalog。
- 创建DLF Catalog
CREATE CATALOG dlf_catalog WITH ( 'type' = 'dlf', 'access.key.id' = '<yourAccessKeyId>', --您阿里云账号的AccessKey ID。 'access.key.secret' = '<yourAccessKeyId>', --您阿里云账号的AccessKey Secret。 'warehouse' = 'oss://oss-bucket/warehouse/test.db', 'oss.endpoint' = '<oss.endpoint>', --从${HADOOP_CONF_DIR}/core-site.xml中获取。 'dlf.endpoint' = '<dlf.endpoint>', --从${HIVE_CONF_DIR}/hive-site.xml中获取。 'dlf.region-id' = '<dlf.region-id>' --从${HIVE_CONF_DIR}/hive-site.xml中获取。 );
- 创建Hive Catalog无需关注Datalake集群中的Hive版本,
hive-version
均使用2.3.6。CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-version' = '2.3.6', 'hive-conf-dir' = '/etc/taihao-apps/hive-conf/', 'hadoop-conf-dir' = '/etc/taihao-apps/hadoop-conf/' );
Catalog创建成功后,均会返回以下信息。[INFO] Execute statement succeed.
- 创建DLF Catalog
- 执行以下命令,验证Catalog。
- 验证DLF Catalog
select * from dlf_catalog.test.hudi_table;
- 验证Hive Catalog
select * from hive_catalog.test.hive_table;
- 验证DLF Catalog
步骤四:Flink SQL写入Hudi
- 场景一:数据入湖使用Datagen Connector随机生成上游Source数据,入湖Hudi表。
-- 构建上游Source数据 CREATE TABLE datagen_source ( uuid int, age int, ts bigint, ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ); -- 创建Hudi表 CREATE TABLE dlf_catalog.test.hudi_tbl1( id int not null, age int, ts bigint ) with( 'connector'='hudi', 'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl1', 'table.type'='COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field'='id', 'hive_sync.enable'='true', 'hive_sync.table'='hudi_tbl1', -- required, Hive新建的表名。 'hive_sync.db'='test', -- required, Hive新建的数据库名。 'hive_sync.mode' = 'hms' -- required, 将hive sync mode设置为hms, 默认jdbc。 ); --入湖 insert into dlf_catalog.test.hudi_tbl1 select uuid as id, age, ts from default_catalog.default_database.datagen_source; -- 查询验证 select * from dlf_catalog.test.hudi_tbl1;
- 场景二:维度打宽入湖
使用ODS层的Hudi数据和Hive的维度表关联打宽填充维度字段,最后写入新的Hudi表。
例如,已有表dlf_catalog.test.hive_dim_tbl,表结构如下。id int name string
详细示例如下。-- 创建目标Hudi表 CREATE TABLE dlf_catalog.test.hudi_tbl2( id int not null, name string, age int, ts bigint ) with( 'connector'='hudi', 'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl2', 'table.type'='COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field'='id', 'hive_sync.enable'='true', 'hive_sync.table'='hudi_tbl2', -- required, Hive新建的表名。 'hive_sync.db'='test', -- required, Hive新建的数据库名。 'hive_sync.mode' = 'hms' -- required, 将hive sync mode设置为hms, 默认jdbc。 ); -- 关联Hive维度表后入湖 insert into dlf_catalog.test.hudi_tbl2 select s.id, name, age, ts from dlf_catalog.test.hudi_tbl1 s join hive_catalog.test.hive_dim_tbl t on s.id = t.id;
步骤五:DataLake集群查询Hudi
登录DataLake集群查询Hudi数据。登录集群详情请参见登录集群。
- Spark查询
Spark查询详情,请参见Hudi与Spark SQL集成。
- 执行以下命令,启动spark-sql。
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
如果您集群的Spark是Spark 3,且Hudi为0.11及以上版本,则需额外添加以下配置。--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
- 执行以下命令,验证表信息。
- 查询hudi_tbl1
select * from test.hudi_tbl1;
- 查询hudi_tbl2
select * from test.hudi_tbl2;
- 查询hudi_tbl1
- 执行以下命令,启动spark-sql。
- Hudi查询
- 执行以下命令,启动Hive CLI。
hive
- 执行以下命令,验证表信息。
- 查询hudi_tbl1
select * from test.hudi_tbl1;
- 查询hudi_tbl2
select * from test.hudi_tbl2;
- 查询hudi_tbl1
- 执行以下命令,启动Hive CLI。