本文介绍Spark如何读取OSS中的数据。

背景信息

当前E-MapReduce:
  • 支持MetaService服务。
  • 支持通过免AccessKey方式访问OSS数据源。
  • 支持通过显式写AccessKey和Endpoint方式访问OSS数据源。
    说明 OSS Endpoint需使用内网域名。域名详情信息,请参见OSS Endpoint

Spark RDD接入OSS示例

本示例为您展示,Spark如何以免AccessKey方式读取OSS中数据,并将处理完的数据写回至OSS。
val conf = new SparkConf().setAppName("Test OSS")
val sc = new SparkContext(conf)
val pathIn = "oss://bucket/path/to/read"
val inputData = sc.textFile(pathIn)
val cnt = inputData.count
println(s"count: $cnt")
val outputPath = "oss://bucket/path/to/write"
val outpuData = inputData.map(e => s"$e has been processed.")
outpuData.saveAsTextFile(outputPath)

完整示例代码,请参见Spark对接OSS

PySpark接入OSS示例

本示例为您展示,PySpark如何以免AccessKey方式读取OSS中数据,并将处理完的数据写回至OSS。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate()
pathIn = "oss://bucket/path/to/read"
df = spark.read.text(pathIn)
cnt = df.count()
print(cnt)
outputPath = "oss://bucket/path/to/write"
df.write.format("parquet").mode('overwrite').save(outputPath)

Spark-SQL创建CSV表示例

本示例为您展示,Spark-SQL如何创建CSV表,并存放至OSS,操作步骤如下。
  1. 在Spark-SQL执行如下代码。
    create database test_db location "oss://test_bucket/test_db";
    use test_db;
    CREATE TABLE student (id INT, name STRING, age INT)
        USING CSV options ("delimiter"=";",  "header"="true");
    insert into student values(1,"ab",12);
    select * from student;
    1    ab    12                
    • bucket_test:是测试使用的OSS BucketName。
    • delimiter:指定CSV文本中分隔符是。
    • header:指定CSV中第一行是否是表头,true表示是,false表示否。
  2. 查看OSS上存储的CSV文件,可以看到如下内容,第一行存放了表头。
    id;name;age
    1;ab;12

集群显示指定访问OSS的AccessKey

本示例将为您展示集群如何显示指定访问OSS的AccessKey,操作步骤如下。
  1. 更改免密配置。

    EMR集群访问OSS是免密配置。您可以通过HDFS组件core-site.xml中的fs.oss.credentials.provider配置项控制免密访问。

    如果想去掉免密,使用AccessKey访问,需要把fs.oss.credentials.provider参数去掉,同时在core-site.xml中加上下面的参数。
    <property>
      <name>fs.oss.accessKeyId</name>
      <value>LTAI5tM85Z4sc****</value>
    </property>
    <property>
      <name>fs.oss.accessKeySecret</name>
      <value>HF7P1L8PS6Eqf****</value>
    </property>
  2. 执行以下命令验证是否生效。
    去掉fs.oss.credentials.provider配置后,通过ls命令,发现无法访问OSS。
    hadoop fs -ls oss://test_bucket/test_db
    ls: ERROR: without login secrets configured.
    加上AccessKey配置之后,能够查看OSS文件目录。
    hadoop fs -ls oss://test_bucket/test_db
    drwxrwxrwx   - root root          0 2022-11-30 12:51 oss://test_bucket/test_db/student
  3. 重启Spark相关服务,查看Spark相关服务是否正常。