本文介绍Schema的校验、合并以及重建。

Schema校验

Schema校验默认开启。当所写的数据没有包含表定义的字段时,该字段会设置为null;当所写数据的字段在表中没有定义时,抛出异常,并提示Schema不匹配。

Delta定义了三种Schema校验规则:
  • 数据字段中包含表中未定义的字段。
  • 数据字段类型与表中该字段的类型定义不同。
  • 数据中包含不区分大小写的同名字段,例如:Foo和foo。
import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val schema=new StructTupe().add("f1", LongType)
val df = spark.createDataFrame(List(Row(1L)).asJava, schema)
df.write.format("delta").mode("append").save("/tmp/delta_table")

org.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table.
To enable schema migration, please set:
'.option("mergeSchema", "true")'.

Table schema:
root
-- id: long (nullable = true)
-- date: date (nullable = true)
-- name: string (nullable = true)
-- sales: string (nullable = true)


Data schema:
root
-- f1: long (nullable = true)

Schema合并

如果需要将数据写入目标表,同时更新Schema,则开启mergeSchema功能。

开启mergeSchema后,如果数据和表的Schema不一致,且满足自动合并Schema的条件,将被自动合并。
  • Scala
    df.write.option("mergeSchema", "true")
  • SQL

    不支持。

自动合并Schema的条件如下:
  • 增加列。
  • 数据类型的兼容式转换:
    • NullType -> 其他任何类型。
    • ByteType -> ShortType -> IntegerType。
      说明 当新写入数据的Schema变更了分区,请重建Schema。

Schema重建

当Schema变动不属于Schema合并的范围,则需要重建Schema。例如:删除某个列或者将列从一个类型转换为另一个不兼容的类型。
  • Scala
    df.write.option("overwriteSchema", "true")

    重建Schema也需要对数据进行重写,以防数据和元数据Schema不一致。

    如果需要重建Schema,请使用df.write.mode("overwrite").option("overwriteSchema", "true")对原有数据和Schema进行重写。其中.option("overwriteSchema", "true")是必配项。

  • SQL

    不支持。