通过集成Spark与OSS,阿里云EMR实现了对云端数据湖的高效处理与分析。EMR支持通过免AccessKey和显式AccessKey两种方式读写OSS数据。本文主要介绍Spark如何处理和分析OSS中的数据。
免AccessKey方式读写OSS
通过Spark RDD读写OSS数据
本示例为您展示,Spark如何以免AccessKey方式读取OSS中数据,并将处理完的数据写回至OSS。
通过SSH方式连接集群的Master节点,具体操作请参见登录集群Master节点。
执行以下命令,启动Spark Shell。
spark-shell
根据实际情况修改下面代码中的参数后,在Spark Shell中运行以下Scala代码读写OSS数据。
import org.apache.spark.{SparkConf, SparkContext} val conf = new SparkConf().setAppName("Test OSS") val sc = new SparkContext(conf) val pathIn = "oss://<yourBucket>/path/to/read" val inputData = sc.textFile(pathIn) val cnt = inputData.count println(s"count: $cnt") val outputPath = "oss://<yourBucket>/path/to/write" val outputData = inputData.map(e => s"$e has been processed.") outputData.saveAsTextFile(outputPath)
本文示例中以下参数请根据您的实际情况替换:
yourBucket
:OSS Bucket的名称。pathIn
:要读取文件的路径。outputPath
:要写入的文件路径。
完整示例代码,请参见Spark对接OSS。
通过PySpark读写OSS数据
本示例为您展示,PySpark如何以免AccessKey方式读取OSS中数据,并将处理完的数据写回至OSS。
通过SSH方式连接集群的Master节点,具体操作请参见登录集群Master节点。
执行以下命令,进入PySpark交互式环境。
pyspark
根据实际情况修改下面代码中的参数后,在PySpark环境中运行代码。
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Python Spark SQL OSS example").getOrCreate() pathIn = "oss://<yourBucket>/path/to/read" df = spark.read.text(pathIn) cnt = df.count() print(cnt) outputPath = "oss://<yourBucket>/path/to/write" df.write.format("parquet").mode('overwrite').save(outputPath)
本文示例中以下参数请根据您的实际情况替换:
yourBucket
:OSS Bucket的名称。pathIn
:要读取文件的路径。outputPath
:要写入的文件路径。
通过Spark-SQL创建CSV表写入OSS
本示例为您展示Spark-SQL如何创建CSV表,并存放至OSS,操作步骤如下。
通过SSH方式连接集群的Master节点,具体操作请参见登录集群Master节点。
执行以下命令,进入Spark SQL命令行。
spark-sql
根据实际情况修改下面代码中的参数后,在Spark SQL环境中运行代码。
CREATE DATABASE test_db LOCATION "oss://<yourBucket>/test_db"; USE test_db; CREATE TABLE student (id INT, name STRING, age INT) USING CSV options ("delimiter"=";", "header"="true"); INSERT INTO student VALUES(1, "ab", 12); SELECT * FROM student;
命令中涉及的参数如下:
yourBucket
:OSS Bucket的名称。delimiter
:指定CSV文件中数据的分隔符。header
:指定CSV文件中第一行是否是表头,可设置true
表示是,false
表示否。
执行完以上命令,返回以下信息。
1 ab 12
查看存储在OSS上的CSV文件。
CSV文件的第一行包含表头,并使用分号(;)作为字段的分隔符。示例内容如下。
id;name;age 1;ab;12
显式AccessKey方式读写OSS
本示例将为您展示集群如何显式指定访问OSS的AccessKey,操作步骤如下。
删除免密配置。
EMR集群默认是通过免密配置访问OSS。如果要取消免密访问,您需要删除Hadoop-Common服务core-site.xml中的fs.oss.credentials.provider配置项。
执行以下命令验证是否可免密访问。
hadoop fs -ls oss://<yourBucket>/test_db
返回以下信息,发现无法访问OSS。
ls: ERROR: not found login secrets, please configure the accessKeyId and accessKeySecret.
显式指定访问OSS的AccessKey。
去掉免密配置后,使用AccessKey访问,您需要在Hadoop-Common服务的core-site.xml中增加下面AccessKey的配置项。
Key
Value
描述
Key
Value
描述
fs.oss.accessKeyId
yourAccessKeyID
阿里云账号或RAM用户的AccessKey ID。
fs.oss.accessKeySecret
yourAccessKeySecret
阿里云账号或RAM用户的AccessKey Secret。
显式指定访问OSS的AccessKey后,执行以下命令验证是否生效。
hadoop fs -ls oss://<yourBucket>/test_db
返回以下信息,则能够查看OSS文件目录。
drwxrwxrwx - root root 0 2025-02-24 11:45 oss://<yourBucket>/test_db/student
重启Spark相关服务,待Spark相关服务正常后即可通过Spark RDD、PySpark、Spark-SQL等方式读写OSS数据。
常见问题
- 本页导读 (1)
- 免AccessKey方式读写OSS
- 通过Spark RDD读写OSS数据
- 通过PySpark读写OSS数据
- 通过Spark-SQL创建CSV表写入OSS
- 显式AccessKey方式读写OSS
- 常见问题