Parquet(推荐)

重要

本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。

Parquet是一种开源的面向列的数据存储格式,它提供了各种存储优化,尤其适合数据分析。Parquet提供列压缩从而可以节省空间,而且它支持按列读取而非整个文件的读取。作为一种文件格式,Parquet与Apache Spark配合的很好,而且实际上也是Spark的默认文件格式。我们建议将数据写到Parquet以方便长期存储,因为从Parquet文件读取始终比从JSON或者CSV文件效率更高。

前提条件

警告

首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。

说明

DDI访问OSS路径结构:oss://BucketName/Object

  • BucketName为您的存储空间名称。

  • Object为上传到OSS上的文件的访问路径。

例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件

// 从oss地址读取文本文档
val dataRDD = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt"

Parquet读取程序中的可选项

read/write

Key

取值范围

默认值

说明

Write

compression

None,Uncompressed,bzip2,defalte,gzip,lz4,snappy

None

声明Spark应该使用什么样的压缩编码器来读取或写入文件

Read

Merge Schema

true,false

配置值spark.sql.parquet.mergeSchema

增量地添加列到同一表/文件夹中的parquet文件里面

实例

1.写入Parquet文件到oss

%spark

val inputPath="oss://databricks-data-source/datas/parquet_data"
case class MyCaseClass(key: String, group: String, value: Int, someints: Seq[Int], somemap: Map[String, Int])
val dataframe = sc.parallelize(Array(MyCaseClass("a", "vowels", 1, Array(1), Map("a" -> 1)),
  MyCaseClass("b", "consonants", 2, Array(2, 2), Map("b" -> 2)),
  MyCaseClass("c", "consonants", 3, Array(3, 3, 3), Map("c" -> 3)),
  MyCaseClass("d", "consonants", 4, Array(4, 4, 4, 4), Map("d" -> 4)),
  MyCaseClass("e", "vowels", 5, Array(5, 5, 5, 5, 5), Map("e" -> 5)))
).toDF()
dataframe.show()
//写入数据到oss
dataframe.write.mode("overwrite").parquet(inputPath)

2.读取Parquet数据

%spark

val inputPath="oss://databricks-data-source/datas/parquet_data"
val dtDF = spark.read.format("parquet")
        .option("mode", "FAILFAST")
        .load(inputPath)
        
dtDF.show(3)
dtDF.printSchema()

dataSQL 方式

%sql 
CREATE TABLE parquetTable
USING parquet
OPTIONS (path "oss://databricks-data-source/datas/parquet_data")

SQL查询

%sql

select * from parquetTable limit 3
data