本文为您详细介绍了如何在EMR on ECS Spark环境中对接DLF Paimon Catalog,帮助您实现高效的数据湖查询与分析。
前提条件
已创建EMR集群,组件选择Spark3, Paimon。EMR版本 >= 5.12.0。如有其他版本诉求,请加入钉钉群(106575000021)联系DLF研发人员。
创建DLF Paimon Catalog
详情请参见DLF 快速入门。
授予角色DLF权限
授予AliyunECSInstanceForEMRRole角色RAM权限(EMR产品化集成后可以省略该步骤)。
使用阿里云账号或RAM管理员登录RAM控制台。
单击
,查询AliyunECSInstanceForEMRRole角色。单击操作列的新增授权,进入新增授权页面。
在权限策略中,查询并勾选AliyunDLFFullAccess,单击确认新增授权。
授予AliyunECSInstanceForEMRRole角色DLF权限。
登录数据湖构建控制台。
在Catalog列表页面,单击Catalog名称,进入Catalog详情页。
单击权限页签,单击授权。
在授权页面,配置以下信息,单击确定。
用户/角色:选择RAM用户/RAM角色。
选择授权对象:在下拉列表中选择AliyunECSInstanceForEMRRole。
说明如果用户下拉列表中未找到AliyunECSInstanceForEMRRole,可以在用户管理页面单击同步。
预置权限类型:选择Data Editor。
升级EMR集群Paimon依赖
请到Maven仓库下载1.1+ 版本的两个JAR:paimon-jindo-*.jar, paimon-spark-3.x-*.jar,并根据EMR集群的Spark版本选取对应依赖。
导入Paimon依赖。
将依赖的两个JAR包
paimon-jindo-*.jar
,paimon-spark-3.x-*.jar
上传至OSS,并设置文件读写权限为公共读。请参见简单上传。将以下脚本修改后上传至OSS。
#!/bin/bash echo 'clean up paimon-dlf-2.5 exists file' rm -rf /opt/apps/PAIMON/paimon-dlf-2.5 rm -rf /opt/apps/PAIMON/paimon-dlf-2.5.tar.gz.* cd /opt/apps/PAIMON/paimon-current/lib/spark3 mkdir -p /opt/apps/PAIMON/paimon-dlf-2.5/lib/spark3 cd /opt/apps/PAIMON/paimon-dlf-2.5/lib/spark3 wget ${paimon-jindo-1.1.0.jar} wget ${paimon-spark-3.x-1.1.0.jar} echo 'link paimon-current to paimon-dlf-2.5' rm -f /opt/apps/PAIMON/paimon-current ln -sf /opt/apps/PAIMON/paimon-dlf-2.5 /opt/apps/PAIMON/paimon-current
重要脚本中的占位符
${paimon-jindo-1.1.0.jar}
和${paimon-spark-3.x-1.1.0.jar}
需替换为对应的OSS可下载路径,例如:https://{bucket}.oss-cn-hangzhou-internal.aliyuncs.com/jars/paimon-jindo-1.1.0.jar
。
通过EMR集群引导脚本执行。详情请参见手动执行脚本。
在EMR集群中,选择
页签,单击创建并执行。在弹出的对话框中,配置以下信息,单击确定。
名称:自定义脚本名称。
脚本位置:选择上传到OSS的升级脚本。脚本路径格式必须是oss://**/*.sh格式。
执行范围:选择集群。
执行完成后,需重启Spark服务以生效。
Spark读写数据
连接Paimon Catalog
在Terminal中执行以下spark-sql命令。
需替换命令中的${regionID}为实际region,如cn-hangzhou;${catalog}替换成在DLF中创建好的catalog名称。
spark-sql --master yarn \
--conf spark.driver.memory=5g \
--conf spark.sql.defaultCatalog=paimon \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.metastore=rest \
--conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions \
--conf spark.sql.catalog.paimon.uri=http://${regionID}-vpc.dlf.aliyuncs.com \
--conf spark.sql.catalog.paimon.warehouse=${catalog} \
--conf spark.sql.catalog.paimon.token.provider=dlf \
--conf spark.sql.catalog.paimon.dlf.token-loader=ecs
创建数据表
执行以下SQL,创建数据表。
CREATE TABLE users_samples
(
user_id INT,
age_level STRING,
final_gender_code STRING,
clk BOOLEAN
);
CREATE TABLE user_samples_di (
user_id INT,
age_level STRING,
final_gender_code STRING,
clk BOOLEAN
)
USING CSV
OPTIONS(
'path'='oss://${bucket}/user/user_samples_di'
);
不指定数据库时,创建数据表会默认建在Catalog下的
default
数据库中,也可创建并指定其他数据库。di的含义是day incremental,代表user_samples的每日增量数据。
目录
/user/user_samples_di
需要提前在OSS中创建。
插入数据
运行以下SQL,插入数据。
INSERT INTO users_samples VALUES
(1, '25-34', 'M', true),
(2, '18-24', 'F', false);
INSERT INTO user_samples_di VALUES
(1, '2-25-34', 'M', true),
(2, '2-18-24', 'F', false),
(3, '2-35-44', 'M', true);
查询数据
运行以下SQL,查询数据。
SELECT * FROM users_samples;
SELECT * FROM user_samples_di;
结果如下。
合并数据
使用上述user_samples_di表merge into到users_samples:
MERGE INTO users_samples
USING user_samples_di
ON users_samples.user_id = user_samples_di.user_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *;
此时,users_samples
表中的数据将被 user_samples_di
表中具有相同 user_id
的数据覆盖,同时会插入 user_samples
表中没有 user_id
的新数据。
查看DLF元数据
当运行成功之后,您可在数据湖构建控制台看到新增的库、表元数据信息。其中,test
是本例中的示例Catalog名称。