本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
CSV意即逗号分隔符(comma-separated value),这是一种常见的文本文件格式,其中每行表示一条记录,用逗号分隔记录中的每个字段。
前提条件
通过主账号登录阿里云 Databricks控制台。
已创建集群,具体请参见创建集群。
已使用OSS管理控制台创建非系统目录存储空间,详情请参见创建存储空间。
首次使用DDI产品创建的Bucket为系统目录Bucket,不建议存放数据,您需要再创建一个Bucket来读写数据。
DDI访问OSS路径结构:oss://BucketName/Object
BucketName为您的存储空间名称。
Object为上传到OSS上的文件的访问路径。
例:读取在存储空间名称为databricks-demo-hangzhou文件路径为demo/The_Sorrows_of_Young_Werther.txt的文件
// 从oss地址读取文本文档
val dataRDD = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt"
csv读取程序的可选项
实际应用场景中遇到的数据内容或结构并不是那么规范,所以CSV读取程序包含大量选项(option),通过这些选项可以帮助解决例如忽略特定字符等问题
read/write | Key | 取值范围 | 默认值 | 说明 |
Both | sep | 任意单个字符串字符 | , | 用作每个字段和值的分隔符的单个字符 |
Both | header | true,false | false | 一个布尔标记符,用于声明文件中的第一行是否为列的名称 |
Both | escape | 任意字符窜 | \ | 用于转译的字符 |
Both | inferSchema | true,false | false | 指定在读取文件时spark是否推断列类型 |
Both | ignoreLeadingWhiteSpace | true,false | false | 声明是否应跳过读取中的前导空格 |
Both | ignoreTrailingWhiteSpace | true,false | false | 声明是否应跳过读取中的尾部 空格 |
Both | nullValue | 任意字符串字符 | "" | 声明在文件中什么字符表示null值 |
Both | nanValue | 任意字符串字符 | NaN | 声明什么字符表示CSV文件中的NaN或缺失字符 |
Both | positiveInf | 任意字符串字符 | Inf | 声明什么字符表示正无穷大 |
Both | negativeInf | 任意字符串字符 | -Inf | 声明什么字符表示负无穷大 |
Both | Compression 或Code | None,Uncompressed,bzip2,deflate,gzip,lz4,snappy | none | 声明spark应该用什么压缩解码器来读取或写入文件 |
Both | dataFormat | 任何符合Java的SimpleDataFormat的字符串或字符 | yyyy-MM-dd | 日期类型的日期格式 |
Both | timestampFormat | 任何符合Java的SimpleDataFormat的字符串或字符 | MMdd 'T' HH:mm ss.SSSZZ | 时间戳类型,时间戳格式 |
Read | maxColumn | 任意整数 | 20480 | 声明文件中的最大列数 |
Read | maxCharsPerColumn | 任意整数 | 1000000 | 声明列中最大字符数 |
Read | escapeQuote | true,false | true | 声明spark是否应该转义在行中找到的引号 |
Read | maxMalformadLogPerPartition | 任意整数 | 10 | 设置spark将为每个分区记录错误格式的行的最大数目,超出此数目的格式错误的记录将被忽略 |
Write | QuoteAll | true,false | false | 指定是否将所有值括在引号中,而不是仅转义具有引号字符窜的值 |
Read | multiline | true,false | false | 此选项用于读取多行CSV文件,其中CSV文件中的每个逻辑行可能跨越文件本身的多行 |
实例
本实例展示了如何使用notebook读取文件的多种方式。
与读取其他格式一样,要读取CSV文件必须首先为该特定格式创建一个DataFrameReader这里我们将格式指定为CSV;
%spark
spark.read.format("csv")
1.hearder 选项
默认header = false
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("mode", "FAILFAST")
.load(path)
dtDF.show(5)
数据展示
header = true
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("header","true")
.option("mode", "FAILFAST")
.load(path)
dtDF.show(5)
数据展示
2.inferSchema选项
默认inferSchema = false
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("header","true")
.option("mode", "FAILFAST")
.load(path)
dtDF.show(5)
dtDF.printSchema()
当inferSchema = true
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("header","true")
.option("mode", "FAILFAST")
.option("inferSchema","true")
.load(path)
dtDF.show(5)
dtDF.printSchema()
当深度类型转换不是我们希望的类型是,我们可以通过自定义Schema
%spark
import org.apache.spark.sql.types._
val path="oss://databricks-data-source/datas/input.csv"
val schema = new StructType()
.add("_c0",IntegerType,true)
.add("color",StringType,true)
.add("depth",DoubleType,true)
.add("table",DoubleType,true)
.add("price",IntegerType,true)
val data_with_schema = spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load(path)
data_with_schema.show(5,false)
data_with_schema.printSchema()
⚠️自定义schema里面包含一个特殊的列_corrupt_record
,该列在数据类型解析不正确时捕获没有正确解析的行,方便查看
%spark
val path="oss://databricks-data-source/datas/input.csv"
val schema = new StructType()
.add("_c0",IntegerType,true)
.add("color",StringType,true)
.add("depth",IntegerType,true) //将字段自定义成整数型
.add("table",DoubleType,true)
.add("price",IntegerType,true)
.add("_corrupt_record", StringType, true) //特殊列_corrupt_record,追踪没有解析成功的列
val data_with_schema = spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load(path)
data_with_schema.show(5,false)
data_with_schema.printSchema()
3.mode 选项
mode主要有三个值,分别是PERMISSIVE
(遇到解析不了,使用系统自带转换,实在不行就转换成null)、DROPMALFORMED
(遇到解析不了,就放弃该记录)和FAILFAST
(遇到解析不了,就报错,终止代码执行)
CSV数据集
1,a,10000,11-03-2019,pune
2,b,10020,14-03-2019,pune
3,a,34567,15-03-2019,pune
tyui,a,fgh-03-2019,pune
4,b,10020,14-03-2019,pune
%spark
import org.apache.spark.sql.types._
val path="oss://databricks-data-source/datas/dataTest.csv"
val schema = StructType(
List(
StructField("id", DataTypes.IntegerType, false,Metadata.empty),
StructField("name", DataTypes.StringType, false,Metadata.empty),
StructField("salary", DataTypes.DoubleType, false,Metadata.empty),
StructField("dob", DataTypes.StringType, false,Metadata.empty),
StructField("loc", DataTypes.StringType, false,Metadata.empty)
)
)
val dtDF = spark.read.format("csv")
.schema(schema)
.option("mode", "DROPMALFORMED")
.load(path)
dtDF.show()
下面是注解以后的结果
如果使用FAILFAST
%spark
import org.apache.spark.sql.types._
val path="oss://databricks-data-source/datas/dataTest.csv"
val schema = StructType(
List(
StructField("id", DataTypes.IntegerType, false,Metadata.empty),
StructField("name", DataTypes.StringType, false,Metadata.empty),
StructField("salary", DataTypes.DoubleType, false,Metadata.empty),
StructField("dob", DataTypes.StringType, false,Metadata.empty),
StructField("loc", DataTypes.StringType, false,Metadata.empty)
)
)
val dtDF = spark.read.format("csv")
.schema(schema)
.option("mode", "FAILFAST")
.load(path)
dtDF.show()
写CSV文件
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("header","true")
.option("mode", "FAILFAST")
.load(path)
val writeDF=dtDF.withColumnRenamed("_c0","id").filter($"depth">60)
writeDF.show(5)
//写入CSV数据到oss
writeDF.coalesce(1).write.format("csv").mode("overwrite").save("oss://databricks-data-source/datas/out")