Spark对接OSS

更新时间:2025-03-27 06:13:08

通过集成SparkOSS,阿里云EMR实现了对云端数据湖的高效处理与分析。EMR支持通过免AccessKey和显式AccessKey两种方式读写OSS数据。本文主要介绍Spark如何处理和分析OSS中的数据。

AccessKey方式读写OSS

通过Spark RDD读写OSS数据

本示例为您展示,Spark如何以免AccessKey方式读取OSS中数据,并将处理完的数据写回至OSS。

  1. 通过SSH方式连接集群的Master节点,具体操作请参见登录集群Master节点

  2. 执行以下命令,启动Spark Shell。

    spark-shell
  3. 根据实际情况修改下面代码中的参数后,在Spark Shell中运行以下Scala代码读写OSS数据。

    import org.apache.spark.{SparkConf, SparkContext}
    val conf = new SparkConf().setAppName("Test OSS")
    val sc = new SparkContext(conf)
    val pathIn = "oss://<yourBucket>/path/to/read"
    val inputData = sc.textFile(pathIn)
    val cnt = inputData.count
    println(s"count: $cnt")
    val outputPath = "oss://<yourBucket>/path/to/write"
    val outputData = inputData.map(e => s"$e has been processed.")
    outputData.saveAsTextFile(outputPath)

    本文示例中以下参数请根据您的实际情况替换:

    • yourBucket:OSS Bucket的名称。

    • pathIn:要读取文件的路径。

    • outputPath:要写入的文件路径。

    完整示例代码,请参见Spark对接OSS

通过PySpark读写OSS数据

本示例为您展示,PySpark如何以免AccessKey方式读取OSS中数据,并将处理完的数据写回至OSS。

  1. 通过SSH方式连接集群的Master节点,具体操作请参见登录集群Master节点

  2. 执行以下命令,进入PySpark交互式环境。

    pyspark
  3. 根据实际情况修改下面代码中的参数后,在PySpark环境中运行代码。

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate()
    pathIn = "oss://<yourBucket>/path/to/read"
    df = spark.read.text(pathIn)
    cnt = df.count()
    print(cnt)
    outputPath = "oss://<yourBucket>/path/to/write"
    df.write.format("parquet").mode('overwrite').save(outputPath)
    

    本文示例中以下参数请根据您的实际情况替换:

    • yourBucket:OSS Bucket的名称。

    • pathIn:要读取文件的路径。

    • outputPath:要写入的文件路径。

通过Spark-SQL创建CSV表写入OSS

本示例为您展示Spark-SQL如何创建CSV表,并存放至OSS,操作步骤如下。

  1. 通过SSH方式连接集群的Master节点,具体操作请参见登录集群Master节点

  2. 执行以下命令,进入Spark SQL命令行。

    spark-sql
  3. 根据实际情况修改下面代码中的参数后,在Spark SQL环境中运行代码。

    CREATE DATABASE test_db LOCATION "oss://<yourBucket>/test_db";
    USE test_db;
    CREATE TABLE student (id INT, name STRING, age INT)
        USING CSV options ("delimiter"=";",  "header"="true");
    INSERT INTO student VALUES(1, "ab", 12);
    SELECT * FROM student;     

    命令中涉及的参数如下:

    • yourBucket:OSS Bucket的名称。

    • delimiter:指定CSV文件中数据的分隔符。

    • header:指定CSV文件中第一行是否是表头,可设置true表示是,false表示否。

    执行完以上命令,返回以下信息。

    1    ab    12     
  4. 查看存储在OSS上的CSV文件。

    CSV文件的第一行包含表头,并使用分号(;)作为字段的分隔符。示例内容如下。

    id;name;age
    1;ab;12

显式AccessKey方式读写OSS

本示例将为您展示集群如何显式指定访问OSSAccessKey,操作步骤如下。

  1. 删除免密配置。

    EMR集群默认是通过免密配置访问OSS。如果要取消免密访问,您需要删除Hadoop-Common服务core-site.xml中的fs.oss.credentials.provider配置项。

  2. 执行以下命令验证是否可免密访问。

    hadoop fs -ls oss://<yourBucket>/test_db

    返回以下信息,发现无法访问OSS。

    ls: ERROR: not found login secrets, please configure the accessKeyId and accessKeySecret.
  3. 显式指定访问OSSAccessKey。

    去掉免密配置后,使用AccessKey访问,您需要在Hadoop-Common服务的core-site.xml中增加下面AccessKey的配置项。

    Key

    Value

    描述

    Key

    Value

    描述

    fs.oss.accessKeyId

    yourAccessKeyID

    阿里云账号或RAM用户的AccessKey ID。

    fs.oss.accessKeySecret

    yourAccessKeySecret

    阿里云账号或RAM用户的AccessKey Secret。

  4. 显式指定访问OSSAccessKey后,执行以下命令验证是否生效。

    hadoop fs -ls oss://<yourBucket>/test_db

    返回以下信息,则能够查看OSS文件目录。

    drwxrwxrwx   - root root          0 2025-02-24 11:45 oss://<yourBucket>/test_db/student
  5. 重启Spark相关服务,待Spark相关服务正常后即可通过Spark RDD、PySpark、Spark-SQL等方式读写OSS数据。

常见问题

读取和写入的OSS Bucket不同,要如何通过Spark读写OSS?

  • 如果读取和写入的Bucket要使用不同的credential,您可以配置Bucket级别的Credential Provider,例如,您可以配置fs.oss.bucket.<BucketName>.credentials.provider,其中<BucketName>为要配置的OSS Bucket名称。具体可参考Bucket配置OSS/OSS-HDFS Credential Provider

  • 如果读取和写入的Bucket属于不同的Region,您可以通过oss://<BucketName>.<外网访问Endpoint>/方式实现Spark读写OSS。该方式会产生流量费用,同时会存在稳定性问题,使用时需注意。

Amazon S3 SDK如何访问OSS?

对象存储OSS提供了兼容Amazon S3API。当您将数据从Amazon S3迁移到OSS后,只需简单的配置修改,即可让您的客户端应用兼容OSS服务。具体请参见使用Amazon S3 SDK访问OSS

  • 本页导读 (1)
  • 免AccessKey方式读写OSS
  • 通过Spark RDD读写OSS数据
  • 通过PySpark读写OSS数据
  • 通过Spark-SQL创建CSV表写入OSS
  • 显式AccessKey方式读写OSS
  • 常见问题
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等