Spark对接OSS提供了高效、灵活的数据处理和分析方式,将大数据处理和云存储相结合。本文介绍Spark如何处理和分析OSS中的数据。
背景信息
当前E-MapReduce:
支持MetaService服务。
支持通过免AccessKey方式访问OSS数据源。
支持通过显式写AccessKey和Endpoint方式访问OSS数据源。
说明OSS Endpoint需使用内网域名。域名详情信息,请参见OSS Endpoint。
Spark RDD接入OSS示例
本示例为您展示,Spark如何以免AccessKey方式读取OSS中数据,并将处理完的数据写回至OSS。
在spark-shell
中执行如下代码。
val conf = new SparkConf().setAppName("Test OSS")
val sc = new SparkContext(conf)
val pathIn = "oss://bucket/path/to/read"
val inputData = sc.textFile(pathIn)
val cnt = inputData.count
println(s"count: $cnt")
val outputPath = "oss://bucket/path/to/write"
val outpuData = inputData.map(e => s"$e has been processed.")
outpuData.saveAsTextFile(outputPath)
完整示例代码,请参见Spark对接OSS。
本文示例中以下参数请根据您的实际情况替换:
pathIn
:定义了要读取的文件路径。outputPath
:定义了要写入的文件路径。
PySpark接入OSS示例
本示例为您展示,PySpark如何以免AccessKey方式读取OSS中数据,并将处理完的数据写回至OSS。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate()
pathIn = "oss://bucket/path/to/read"
df = spark.read.text(pathIn)
cnt = df.count()
print(cnt)
outputPath = "oss://bucket/path/to/write"
df.write.format("parquet").mode('overwrite').save(outputPath)
Spark-SQL创建CSV表示例
本示例为您展示Spark-SQL如何创建CSV表,并存放至OSS,操作步骤如下。
在
spark-sql
中执行如下命令。CREATE DATABASE test_db LOCATION "oss://test_bucket/test_db"; USE test_db; CREATE TABLE student (id INT, name STRING, age INT) USING CSV options ("delimiter"=";", "header"="true"); INSERT INTO student VALUES(1, "ab", 12); SELECT * FROM student;
执行完以上命令,返回以下信息。
1 ab 12
命令中涉及参数如下:
test_bucket
:OSS Bucket的名称。delimiter
:指定CSV文件中数据的分隔符。header
:指定CSV文件中第一行是否是表头,可设置true
表示是,false
表示否。
查看存储在OSS上的CSV文件。
CSV文件的第一行包含表头,并使用分号(;)作为字段的分隔符。示例内容如下。
id;name;age 1;ab;12
集群显示指定访问OSS的AccessKey
本示例将为您展示集群如何显示指定访问OSS的AccessKey,操作步骤如下。
更改免密配置。
EMR集群访问OSS是免密配置。您可以通过HDFS组件core-site.xml中的fs.oss.credentials.provider配置项控制免密访问。
如果想去掉免密,使用AccessKey访问,需要把fs.oss.credentials.provider参数去掉,同时在core-site.xml中加上下面AccessKey的参数。
<property> <name>fs.oss.accessKeyId</name> <value>LTAI5tM85Z4sc****</value> </property> <property> <name>fs.oss.accessKeySecret</name> <value>HF7P1L8PS6Eqf****</value> </property>
执行以下命令验证是否生效。
hadoop fs -ls oss://test_bucket/test_db
返回以下信息,发现无法访问OSS。
ls: ERROR: without login secrets configured.
加上AccessKey配置之后,重新执行命令
hadoop fs -ls oss://test_bucket/test_db
,则能够查看OSS文件目录。drwxrwxrwx - root root 0 2022-11-30 12:51 oss://test_bucket/test_db/student
重启Spark相关服务,查看Spark相关服务是否正常。