DataFlow集群可以通过数据湖构建(DLF)的统一元数据服务,访问DataLake集群或自定义集群中的Hudi表数据。本文为您介绍DataFlow集群如何连接DLF并读取Hudi全量数据。
前提条件
使用限制
DataFlow集群版本需为EMR-3.38.3及以上,且不超过EMR-3.50.x或EMR-5.16.x。
操作流程
步骤一:环境准备
拷贝DataLake集群中${HIVE_CONF_DIR}
下的hive-site.xml到DataFlow集群。
例如,${HIVE_CONF_DIR}
为/etc/taihao-apps/hive-conf/。
mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1节点内网的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/
步骤二:启动Flink SQL
务必将DLF的依赖包放置在Hive依赖包的前面,其中DLF依赖包中嵌入了Hudi的依赖。
无需关注DataLake集群中的Hive版本,Hive依赖均使用2.3.6版本的。
执行以下命令,启动Flink YARN会话。
yarn-session.sh -d -qu default
执行以下命令,启动Flink SQL。
sql-client.sh \ -j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar \ -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.15-vvr-6.0.4-SNAPSHOT-jar-with-dependencies.jar
请根据实际情况替换上述JAR包中的版本号。
测试时需设置以下配置。
-- 启用详细日志输出。 set sql-client.verbose=true; -- 设置结果显示模式为表格形式。 set sql-client.execution.result-mode=tableau; -- 设置checkpoint时间间隔为1秒,确保数据在checkpoint触发后才可见。主要用于步骤四中Source数据的生成。 set execution.checkpointing.interval=1000;
步骤三:创建Catalog
进入Flink SQL后,执行以下命令,创建DLF Catalog,用于读取Hudi表。
CREATE CATALOG dlf_catalog WITH (
'type' = 'dlf',
'access.key.id' = '<yourAccessKeyId>', --您阿里云账号的AccessKey ID。
'access.key.secret' = '<yourAccessKeySecret>', --您阿里云账号的AccessKey Secret。
'warehouse' = 'oss://<bucket>/<object>', -- bucket:表示您创建的OSS Bucket名称。object:表示您存放数据的路径。您可以在OSS管理控制台上查看。
'oss.endpoint' = '<oss.endpoint>', --从${HADOOP_CONF_DIR}/core-site.xml中获取fs.oss.endpoint的值。
'dlf.endpoint' = '<dlf.endpoint>', --从/etc/taihao-apps/hive-conf/hive-site.xml中获取dlf.catalog.endpoint的值。
'dlf.region-id' = '<dlf.region-id>' --从/etc/taihao-apps/hive-conf/hive-site.xml中获取dlf.catalog.region的值。
);
Catalog创建成功后,会返回以下信息。
[INFO] Execute statement succeed.
步骤四:Flink SQL写入Hudi
使用Datagen Connector随机生成上游Source数据,入湖Hudi表。
-- 构建上游Source数据
CREATE TABLE datagen_source (
uuid int,
age int,
ts bigint
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- 创建Hudi库表
CREATE database dlf_catalog.testdb;
CREATE TABLE dlf_catalog.testdb.hudi_tbl1(
id int NOT NULL,
age int,
ts bigint
)
WITH(
'connector'='hudi',
'path' = 'oss://<bucket>/<object>/testdb/hudi_tbl1', -- oss://<bucket>/<object> 为创建dlf_catalog时的warehouse,testdb为创建的数据库名,hudi_tbl1为表名。
'table.type'='COPY_ON_WRITE',
'hoodie.datasource.write.recordkey.field'='id',
'hive_sync.enable'='true',
'hive_sync.table'='hudi_tbl1', -- required, Hive新建的表名。
'hive_sync.db'='testdb', -- required, Hive新建的数据库名。
'hive_sync.mode' = 'hms' -- required, 将hive sync mode设置为hms, 默认jdbc。
);
--入湖
INSERT INTO dlf_catalog.testdb.hudi_tbl1
SELECT uuid AS id, age, ts
FROM default_catalog.default_database.datagen_source;
-- 查询验证
SELECT * FROM dlf_catalog.testdb.hudi_tbl1;
步骤五:DataLake集群查询Hudi
登录DataLake集群查询Hudi数据。登录集群详情请参见登录集群。
Spark查询
Spark查询详情,请参见Hudi与Spark SQL集成。
执行以下命令,启动spark-sql。
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
如果您集群的Spark是Spark3,且Hudi为0.11及以上版本,则需额外添加以下配置。
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
执行以下命令,验证表信息。
SELECT * FROM testdb.hudi_tbl1;
Hudi查询
执行以下命令,启动Hive CLI。
hive
执行以下命令,验证表信息。
SELECT * FROM testdb.hudi_tbl1;
该文章对您有帮助吗?
- 本页导读 (1)
- 前提条件
- 使用限制
- 操作流程
- 步骤一:环境准备
- 步骤二:启动Flink SQL
- 步骤三:创建Catalog
- 步骤四:Flink SQL写入Hudi
- 步骤五:DataLake集群查询Hudi