本节介绍Schema的校验、合并以及重建。

Schema校验

默认情况下Schema校验是打开的。当所写的数据没有包含表定义的字段时,这些字段会设置为null;当所写数据的字段在表中没有定义时,会抛出异常,提示Schema不匹配。

import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val schema=new StructTupe().add("f1", LongType)
val df = spark.createDataFrame(List(Row(1L)).asJava, schema)
df.write.format("delta").mode("append").save("/tmp/delta_table")

org.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table.
To enable schema migration, please set:
'.option("mergeSchema", "true")'.

Table schema:
root
-- id: long (nullable = true)
-- date: date (nullable = true)
-- name: string (nullable = true)
-- sales: string (nullable = true)


Data schema:
root
-- f1: long (nullable = true)

Delta定义了三种Schema校验规则:

  • 数据字段中包含表中未定义的字段。
  • 数据字段类型与表中该字段的类型定义不同。
  • 数据中包含不区分大小写的同名字段,例如:Foo和foo。

Schema合并

如果需要将Data写入目标表,同时更新Schema,需要将自动mergeSchema的功能打开,这样,当数据和表的Schema不一致且满足自动合并Schema的条件时,Schema将会合并。

  • Scala
    df.write.option("mergeSchema", "true")
  • SQL

    暂不支持。

自动合并Schema的条件包括:
  • 增加列。
  • 数据类型的兼容式转换,包括:
    • NullType -> 其他任何类型。
    • ByteType -> ShortType -> IntegerType。
      说明 如果新写入数据的Schema对分区做了改动,只能Schema重建,目前不支持合并。

Schema重建

某些情况下,Schema变动不属于Schema合并的范围,例如:删除某个列,或者将列的类型从一个类型转换为另一个不兼容的类型,这个时候需要重建Schema。

  • Scala
    df.write.option("overwriteSchema", "true")

    重建Schema也需要对数据进行重写,否则会出现数据和元数据Schema不一致的情况。如果要重建Schema,需要使用df.write.mode("overwrite").option("overwriteSchema", "true")对原有数据和Schema进行重写。其中.option("overwriteSchema", "true")是必须的,因为df.write.mode("overwrite")不会重写Schema,只会重写数据。

  • SQL

    暂不支持。