本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
ORC是为Hadoop作业而设计的自描述,类型感知的列存储文件格式。它针对大型流式数据读取进行了优化,但集成了对快速查询所需要行的相关支持。ORC和Parquet文件格式的区别:本质上Parquet针对Spark进行了优化,而ORC针对Hive进行的优化。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建集群,具体请参见创建集群。
已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间。
创建集群并通过knox账号访问Notebook。
警告
首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。
说明
DDI访问OSS路径结构:oss://BucketName/Object
BucketName为您的存储空间名称。
Object为上传到OSS上的文件的访问路径。
例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件
// 从oss地址读取文本文档
val dataRDD = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt"
实例
1.写入ORC数据到OSS 并读取数据
%spark
val inputPath="oss://databricks-fjl-test/datas/orc_data"
case class MyCaseClass(key: String, group: String, value: Int, someints: Seq[Int], somemap: Map[String, Int])
val dataframe = sc.parallelize(Array(MyCaseClass("a", "vowels", 1, Array(1), Map("a" -> 1)),
MyCaseClass("b", "consonants", 2, Array(2, 2), Map("b" -> 2)),
MyCaseClass("c", "consonants", 3, Array(3, 3, 3), Map("c" -> 3)),
MyCaseClass("d", "consonants", 4, Array(4, 4, 4, 4), Map("d" -> 4)),
MyCaseClass("e", "vowels", 5, Array(5, 5, 5, 5, 5), Map("e" -> 5)))
).toDF()
dataframe.coalesce(1).write.mode("overwrite").orc(inputPath)
2.读取orc数据
%spark
val inputPath="oss://databricks-data-source/datas/orc_data"
val dataDF= spark.read.format("orc")
.option("header", "true")
.option("inferSchema","true")
.option("mode", "FAILFAST")
.load(inputPath)
dataDF.show(3)
dataDF.printSchema()
SQL 方式
%sql
CREATE TABLE orcTable
USING orc
OPTIONS (path "oss://databricks-data-source/datas/orc_data")
%sql
select * from orcTable limit 3
3.写入ORC数据到OSS
dataDF.write.format("orc").mode("overwrite").save("oss://databricks-data-source/datas/out/orc")
文档内容是否对您有帮助?