全部产品
云市场

Spark对接OSS快速入门

更新时间:2019-10-29 10:59:09

简介

对象存储服务(Object Storage Service,OSS)是一种海量、安全、低成本、高可靠的云存储服务,适合存放任意类型的文件。容量和处理能力弹性扩展,多种存储类型供选择,全面优化存储成本。本文主要介绍通过Spark操作OSS数据的常见方式,代码以Scala为例。本文的代码可以通过“数据工作台”提交。

前置条件

  1. OSS已经创建bucket,假设名称为:test_spark
  2. 已创建具备读写OSS bucket:test_spark权限的用户。假设用户名为test_oss,访问OSS的AccessKeyID和AccessKeySecret分别为:accessId,accessKey。
  3. OSS的路径格式为:oss://${AccessKeyID}:${AccessKeySecret}@${bucketName}.${endPoint}/${ossKeyPath}。例如:oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv

说明: OSS Endpoint 需使用内网域名,具体请参见OSS Endpoint

使用Spark读写OSS文件样例

假设有如下内容的文本数据已经存在OSS中,路径为:oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv ,内容为:

  1. 101, name_101, 0.52
  2. 102, name_102, 0.78
  3. 103, name_103, 0.76
  4. 104, name_104, 0.78
  5. 105, name_105, 0.02
  6. 106, name_106, 0.29
  7. 107, name_107, 0.63
  8. 108, name_108, 0.20
  9. 109, name_109, 0.07
  10. 110, name_110, 0.33

通过Spark读取文件,常用两种方法

一、 使用DataFrame 读取,实例代码如下:

  1. val conf = new SparkConf().setAppName("spark sql test")
  2. val sparkSession = SparkSession
  3. .builder()
  4. .config(conf)
  5. .enableHiveSupport()
  6. .getOrCreate()
  7. val ossCsvPath = s"oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv"
  8. //读取test.csv并生产DataFrame
  9. val fileDF = sparkSession.read.csv(ossCsvPath)
  10. //打印fileDF内容
  11. fileDF.show()
  12. //也可以把fileDF 注册是Spark表
  13. fileDF.createOrReplaceTempView(“test_table")
  14. sparkSession.sql("select * from test_table").show()

二、 创建Spark Sql表指向test.csv,实例代码如下:

  1. val sql =
  2. s"""create table test_table(
  3. | id int,
  4. | name string,
  5. | value float
  6. | ) row format delimited fields terminated by ','
  7. | location 'oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/'
  8. | """.stripMargin
  9. //创建spark 表
  10. sparkSession.sql(sql)
  11. //查询表数据
  12. sparkSession.sql("select * from test_table").show()

通过Spark写文件,常用DataFrame写文件。

示例代码如下:

  1. val conf = new SparkConf().setAppName("spark sql test")
  2. val sparkSession = SparkSession
  3. .builder()
  4. .config(conf)
  5. .enableHiveSupport()
  6. .getOrCreate()
  7. val ossCsvPath = s"oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/spark-table/test.csv"
  8. //读取test.csv并生产DataFrame
  9. val fileDF = sparkSession.read.csv(ossCsvPath)
  10. //打印fileDF内容
  11. fileDF.show()
  12. val writeOssParquetPath = "oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/parquet-table/"
  13. //写parquet格式文件
  14. fileDF.write.parquet(writeOssParquetPath)
  15. val writeCsvParquetPath = "oss://accessId:accessKey@test_spark.oss-cn-shenzhen-internal.aliyuncs.com/user/csv-table/"
  16. //写csv格式文件
  17. fileDF.write.csv(writeCsvParquetPath)

小结

本文给出Spark操作OSS数据的基本用法,更多用法会陆续推出。