OSS

本文介绍如何通过Serverless Spark访问OSS数据源。您需要先配置访问OSS的权限,然后可以使用SQL的方式或者提交代码包(Python或者Jar包)的方式访问OSS。

重要

云原生数据湖分析(DLA)产品已退市,云原生数据仓库 AnalyticDB MySQL 版湖仓版支持DLA已有功能,并提供更多的功能和更好的性能。AnalyticDB for MySQL相关使用文档,请参见访问OSS数据源

操作步骤

  1. 配置DLA访问OSS的权限。

    • 如果您使用的是阿里云主账号访问OSS,则默认您拥有该账号下所有OSS数据以及DLA OSS表的访问权限,无需配置,可直接使用。

    • 如果您使用RAM子账号访问OSS并提交代码包作业,需要配置代码包和Spark代码访问OSS的权限。具体操作请参见细粒度配置RAM子账号权限

    • 如果您使用Spark SQL访问DLA OSS表的数据,需要确保您的RAM子账号关联了DLA账号,并且DLA账号拥有对应表的访问权限。如果您的RAM子账号未关联DLA账号,请进行关联操作,具体操作请参见DLA子账号绑定RAM账号。DLA账号对于表的访问权限,您可以登录DLA控制台,在Serverless Presto > SQL执行页面,使用GRANTREVOKE语法进行操作。

  2. 配置Spark OSS Connector。

    配置了OSS访问权限之后,您就可以使用Spark来访问OSS数据了。在Spark的作业配置文件中,您需要添加配置项“spark.dla.connectors” : “oss”。DLA平台内置了Spark OSS Connector相关的实现,默认不生效,需要配置该参数令其生效。如果您有Spark OSS Connector的其他实现方式,您不需要配置该参数,您只需提交您自己的实现Jar包,并添加相应的配置即可。

  3. 访问OSS数据。

    您可以通过以下两种方式访问OSS数据:

    • 通过提交Spark SQL语句的方式来访问OSS数据,具体操作请参见Spark SQL。作业示例配置如下所示:

      {
          "sqls": [
              "select * from `1k_tables`.`table0` limit 100",
              "insert into `1k_tables`.`table0` values(1, 'test')"
          ],
          "name": "sql oss test",
          "conf": {
              "spark.dla.connectors": "oss",
              "spark.driver.resourceSpec": "small",
              "spark.sql.hive.metastore.version": "dla",
              "spark.executor.instances": 10,
              "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
              "spark.executor.resourceSpec": "small"
          }
      }
    • 通过Java、Scala、Python代码访问OSS数据。下面以Scala为例进行说明:

      {  
        "args": ["oss://${oss-buck-name}/data/test/test.csv"],
        "name": "spark-oss-test",
        "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
        "className": "com.aliyun.spark.oss.SparkReadOss",
        "conf": {
          "spark.driver.resourceSpec": "medium",
          "spark.executor.resourceSpec": "medium",
          "spark.executor.instances": 2,
          "spark.dla.connectors": "oss"
        }
      }
      说明

      SparkReadOss对应的源码可以参考DLA Spark OSS demo

启用OSS数据写入性能优化

当您使用自建HiveMetaStore或者DLA元数据服务访问OSS时,社区版Spark HiveClient的rename操作比较低效,DLA对此进行了优化。您只需要将参数spark.sql.hive.dla.metastoreV2.enable设置为true即可启用这项优化。示例如下:

{
  "args": ["oss://${oss-buck-name}/data/test/test.csv"],
  "name": "spark-oss-test",
  "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
  "className": "com.aliyun.spark.oss.WriteParquetFile",
  "conf": {
    "spark.driver.resourceSpec": "medium",
    "spark.executor.resourceSpec": "medium",
    "spark.executor.instances": 2,
    "spark.dla.connectors": "oss",
    "spark.sql.hive.dla.metastoreV2.enable": "true"
  }
}

OSS Connector数据写入性能优化

OSS Connector数据写入性能优化功能是DLA Spark团队基于OSS分片上传功能,针对Spark写入数据到OSS过程中大量调用OSS API导致写入性能差的问题,实现的性能优化提升。在典型场景下,性能可提升1~3倍。

您需要启用DLA Spark内置的OSS connector,并开启性能优化开关,才能使用该功能。具体配置如下:

spark.dla.connectors = oss;  //启用DLA Spark内置的OSS connector。
spark.hadoop.job.oss.fileoutputcommitter.enable = true;  //开启性能优化开关。
说明
  • 如果启用该性能优化功能,在作业被强制Kill等情况下,可能会产生一些没有被清理的文件碎片,占用您OSS的存储空间。建议对相关OSS Bucket设置碎片生命周期规则,对过期未合并的碎片自动进行清理,建议配置周期为3天以上。具体操作请参见设置生命周期规则

  • 该性能优化功能对RDD的saveAsHadoop前缀和saveAsNewAPIHadoop前缀的方法不生效。

使用示例:

{
  "args": ["oss://${oss-buck-name}/data/test/test.csv"],
  "name": "spark-oss-test",
  "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
  "className": "com.aliyun.spark.oss.WriteParquetFile",
  "conf": {
    "spark.driver.resourceSpec": "medium",
    "spark.executor.resourceSpec": "medium",
    "spark.executor.instances": 2,
    "spark.dla.connectors": "oss",
    "spark.hadoop.job.oss.fileoutputcommitter.enable": true
  }
}