本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
Parquet是一种开源的面向列的数据存储格式,它提供了各种存储优化,尤其适合数据分析。Parquet提供列压缩从而可以节省空间,而且它支持按列读取而非整个文件的读取。作为一种文件格式,Parquet与Apache Spark配合的很好,而且实际上也是Spark的默认文件格式。我们建议将数据写到Parquet以方便长期存储,因为从Parquet文件读取始终比从JSON或者CSV文件效率更高。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建集群,具体请参见创建集群。
已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间。
创建集群并通过knox账号访问Notebook
首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。
DDI访问OSS路径结构:oss://BucketName/Object
BucketName为您的存储空间名称。
Object为上传到OSS上的文件的访问路径。
例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件
// 从oss地址读取文本文档
val dataRDD = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt"
Parquet读取程序中的可选项
read/write | Key | 取值范围 | 默认值 | 说明 |
Write | compression | None,Uncompressed,bzip2,defalte,gzip,lz4,snappy | None | 声明Spark应该使用什么样的压缩编码器来读取或写入文件 |
Read | Merge Schema | true,false | 配置值spark.sql.parquet.mergeSchema | 增量地添加列到同一表/文件夹中的parquet文件里面 |
实例
1.写入Parquet文件到oss
%spark
val inputPath="oss://databricks-data-source/datas/parquet_data"
case class MyCaseClass(key: String, group: String, value: Int, someints: Seq[Int], somemap: Map[String, Int])
val dataframe = sc.parallelize(Array(MyCaseClass("a", "vowels", 1, Array(1), Map("a" -> 1)),
MyCaseClass("b", "consonants", 2, Array(2, 2), Map("b" -> 2)),
MyCaseClass("c", "consonants", 3, Array(3, 3, 3), Map("c" -> 3)),
MyCaseClass("d", "consonants", 4, Array(4, 4, 4, 4), Map("d" -> 4)),
MyCaseClass("e", "vowels", 5, Array(5, 5, 5, 5, 5), Map("e" -> 5)))
).toDF()
dataframe.show()
//写入数据到oss
dataframe.write.mode("overwrite").parquet(inputPath)
2.读取Parquet数据
%spark
val inputPath="oss://databricks-data-source/datas/parquet_data"
val dtDF = spark.read.format("parquet")
.option("mode", "FAILFAST")
.load(inputPath)
dtDF.show(3)
dtDF.printSchema()
SQL 方式
%sql
CREATE TABLE parquetTable
USING parquet
OPTIONS (path "oss://databricks-data-source/datas/parquet_data")
SQL查询
%sql
select * from parquetTable limit 3