本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
在Spark中,我们提及的JSON文件是换行符分隔的JSON,每行必须包含一个单独的,独立有效的JSON对象。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建集群,具体请参见创建集群。
已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间。
创建集群并通过knox账号访问Notebook。
首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。
BucketName为您的存储空间名称。
Object为上传到OSS上的文件的访问路径。
例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件
// 从oss地址读取文本文档
val dataRDD = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt"
JSON读取程序中的可选项
read/write | Key | 取值范围 | 默认值 | 说明 |
Both | Compression或code | None,uncompressed,bzip2,defalte,gzip,lz4,snappy | none | 声明Spark应该使用什么压缩编解码来读取或写入文件 |
Both | dateFormat | 任何符合Java SimpleDateFormat格式的字符串或字符 | yyyy-MM-dd | 为日期类型的列声明日期格式 |
Both | timestampFormat | 任何符合Java SimpleDateFormat格式的字符串或字符 | yyyy-MM-dd'T'HH:mm:ss:SSSZZ | 为时间戳类型的列声明时间格式 |
Read | primitiveAsString | true,false | false | 将所有原始值推断为字符串类型 |
Read | allowComments | true,false | false | 忽略JSON记录中的Java/C++样式注解 |
Read | allowUnquoteFieldNames | true,false | false | 允许不带引号的JSON字段名 |
Read | allowSingleQuotes | true,false | true | 除双引号外还允许使用单引号 |
Read | allowNumericLeadingZeros | true,false | false | 允许数字中存在前导零(例如,00012) |
Read | allowBackslashEscAPIngAny | true,false | false | 允许反斜杠机制接受所有字符 |
Read | columnNameOfCorruptRecord | Any String | Spark.sql.column&NameOfCorruptRecord | 允许重命名,permissive模式下添加的新字段,会覆盖重写 |
Read | multiLine | true,false | false | 允许读取非换行符分隔的JSON文件 |
实例
本实例展示了如何使用notebook读取JSON文件的多种方式。
%spark
spark.read.format("json")
实例数据
{"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}}
{"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}}
{"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}}
数据读取
%spark
val path="oss://databricks-data-source/datas/example.json"
val data_json=spark.read .format("json").load(path)
data_json.show()
data_json.printSchema()
2. 添加mode,inferSchema选项
mode = FAILFAST
inferSchema = true
%spark
val path="oss://databricks-data-source/datas/example.json"
val data_json= spark.read.format("json")
.option("header", "true")
.option("mode","FAILFAST")
.option("inferSchema","true")
.load(path)
data_json.show()
data_json.printSchema()
3. multiLine 选项
多个对象组成的一条JSON时的数据源
{"1":{"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}},
"2":{"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}},
"3":{"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}}}
实例代码
%spark
val path="oss://databricks-data-source/datas/json_test_oneLine.json"
val data_json= spark.read.format("json")
.option("header", "true")
.option("inferSchema","true")
.option("mode", "DROPMALFORMED") //遇到解析不了,放弃该记录
.option("multiLine", "true") //允许读取非换行符分割的JSON文件
.load(path)
data_json.show(false)
SQL格式
%sql
CREATE TEMPORARY VIEW multiLineJsonTable
USING json
OPTIONS (path="oss://databricks-data-source/datas/example.json")
%sql
select * from multiLineJsonTable limit 3