EMR on ECS Spark对接DLF Paimon Catalog

本文为您详细介绍了如何在EMR on ECS Spark环境中对接DLF Paimon Catalog,帮助您实现高效的数据湖查询与分析。

前提条件

已创建EMR集群,组件选择Spark3, Paimon。EMR版本 >= 5.12.0。如有其他版本诉求,请加入钉钉群(106575000021)联系DLF研发人员。

创建DLF Paimon Catalog

详情请参见DLF 快速入门

授予角色DLF权限

  1. 授予AliyunECSInstanceForEMRRole角色RAM权限(EMR产品化集成后可以省略该步骤)。

    1. 使用阿里云账号或RAM管理员登录RAM控制台

    2. 单击身份管理 > 角色,查询AliyunECSInstanceForEMRRole角色。

    3. 单击操作列的新增授权,进入新增授权页面。

    4. 权限策略中,查询并勾选AliyunDLFFullAccess,单击确认新增授权

    image

  2. 授予AliyunECSInstanceForEMRRole角色DLF权限。

    1. 登录数据湖构建控制台

    2. Catalog列表页面,单击Catalog名称,进入Catalog详情页。

    3. 单击权限页签,单击授权

    4. 在授权页面,配置以下信息,单击确定

      • 用户/角色:选择RAM用户/RAM角色

      • 选择授权对象:在下拉列表中选择AliyunECSInstanceForEMRRole

        说明

        如果用户下拉列表中未找到AliyunECSInstanceForEMRRole,可以在用户管理页面单击同步。

      • 预置权限类型:选择Data Editor。

升级EMR集群Paimon依赖

请到Maven仓库下载1.1+ 版本的两个JAR:paimon-jindo-*.jar, paimon-spark-3.x-*.jar,并根据EMR集群的Spark版本选取对应依赖。

  1. 导入Paimon依赖。

    1. 将依赖的两个JARpaimon-jindo-*.jar, paimon-spark-3.x-*.jar上传至OSS,并设置文件读写权限为公共读。请参见简单上传

    2. 将以下脚本修改后上传至OSS。

      #!/bin/bash
      
      echo 'clean up paimon-dlf-2.5 exists file'
      rm -rf /opt/apps/PAIMON/paimon-dlf-2.5
      rm -rf /opt/apps/PAIMON/paimon-dlf-2.5.tar.gz.*
      cd /opt/apps/PAIMON/paimon-current/lib/spark3
      mkdir -p /opt/apps/PAIMON/paimon-dlf-2.5/lib/spark3
      cd /opt/apps/PAIMON/paimon-dlf-2.5/lib/spark3
      wget ${paimon-jindo-1.1.0.jar}
      wget ${paimon-spark-3.x-1.1.0.jar}
      
      echo 'link paimon-current to paimon-dlf-2.5'
      rm -f /opt/apps/PAIMON/paimon-current
      ln -sf /opt/apps/PAIMON/paimon-dlf-2.5 /opt/apps/PAIMON/paimon-current
      重要

      脚本中的占位符 ${paimon-jindo-1.1.0.jar} 和 ${paimon-spark-3.x-1.1.0.jar} 需替换为对应的OSS可下载路径,例如:https://{bucket}.oss-cn-hangzhou-internal.aliyuncs.com/jars/paimon-jindo-1.1.0.jar

  2. 通过EMR集群引导脚本执行。详情请参见手动执行脚本

    1. EMR集群中,选择脚本操作 > 手动执行页签,单击创建并执行

    2. 在弹出的对话框中,配置以下信息,单击确定

      • 名称:自定义脚本名称。

      • 脚本位置:选择上传到OSS的升级脚本。脚本路径格式必须是oss://**/*.sh格式。

      • 执行范围:选择集群

  3. 执行完成后,需重启Spark服务以生效。

Spark读写数据

连接Paimon Catalog

Terminal中执行以下spark-sql命令。

重要

需替换命令中的${regionID}为实际region,如cn-hangzhou;${catalog}替换成在DLF中创建好的catalog名称。

spark-sql --master yarn \
  --conf spark.driver.memory=5g \
  --conf spark.sql.defaultCatalog=paimon \
  --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
  --conf spark.sql.catalog.paimon.metastore=rest \
  --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions \
  --conf spark.sql.catalog.paimon.uri=http://${regionID}-vpc.dlf.aliyuncs.com \
  --conf spark.sql.catalog.paimon.warehouse=${catalog} \
  --conf spark.sql.catalog.paimon.token.provider=dlf \
  --conf spark.sql.catalog.paimon.dlf.token-loader=ecs

创建数据表

执行以下SQL,创建数据表。

CREATE TABLE users_samples
(
    user_id INT,             
    age_level STRING,           
    final_gender_code STRING,    
    clk BOOLEAN
);

CREATE TABLE user_samples_di (
    user_id INT,             
    age_level STRING,           
    final_gender_code STRING,    
    clk BOOLEAN
)
USING CSV
OPTIONS(
'path'='oss://${bucket}/user/user_samples_di'
);
说明
  • 不指定数据库时,创建数据表会默认建在Catalog下的default数据库中,也可创建并指定其他数据库。

  • di的含义是day incremental,代表user_samples的每日增量数据。

  • 目录/user/user_samples_di需要提前在OSS中创建。

插入数据

运行以下SQL,插入数据。

INSERT INTO users_samples VALUES
(1, '25-34', 'M', true),
(2, '18-24', 'F', false);

INSERT INTO user_samples_di VALUES
(1, '2-25-34', 'M', true),
(2, '2-18-24', 'F', false),
(3, '2-35-44', 'M', true);

查询数据

运行以下SQL,查询数据。

SELECT * FROM users_samples;
SELECT * FROM user_samples_di;

结果如下。

imageimage

合并数据

使用上述user_samples_dimerge intousers_samples:

MERGE INTO users_samples
USING user_samples_di
ON users_samples.user_id = user_samples_di.user_id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *;

此时,users_samples 表中的数据将被 user_samples_di 表中具有相同 user_id 的数据覆盖,同时会插入 user_samples 表中没有 user_id 的新数据。

image

查看DLF元数据

当运行成功之后,您可在数据湖构建控制台看到新增的库、表元数据信息。其中,test是本例中的示例Catalog名称。

image