本文为您介绍如何在Spark SQL流式处理中使用MERGE INTO语句。

语法

mergeInto
    : MERGE INTO target=tableIdentifier tableAlias
      USING (source=tableIdentifier (timeTravel)? | '(' subquery = query ')') tableAlias
      mergeCondition?
      matchedClauses*
      notMatchedClause?

mergeCondition
    : ON condition=booleanExpression

matchedClauses
    : deleteClause
    | updateClause

notMatchedClause
    : insertClause

deleteClause
    : WHEN MATCHED (AND deleteCond=booleanExpression)? THEN deleteAction
    | WHEN deleteCond=booleanExpression THEN deleteAction

updateClause
    : WHEN MATCHED (AND updateCond=booleanExpression)? THEN updateAction
    | WHEN updateCond=booleanExpression THEN updateAction

insertClause
    : WHEN NOT MATCHED (AND insertCond=booleanExpression)? THEN insertAction
    | WHEN insertCond=booleanExpression THEN insertAction

示例

  • 源表和目标表定义。
    -- source table
    source_table
        : id int,         --主键
        | name string,
        | opType string   --数据操作类型
    
    
    -- target table
    target_table
        : id int,         --主键
        | name string
  • Merge Into Delta。
    MERGE INTO target_table t
    USING source_table s
    ON s.id = t.id
    WHEN MATCHED AND s.opType = 'delete' THEN DELETE
    WHEN MATCHED AND s.opType = 'update' THEN UPDATE SET id = s.id, name = s.name
    WHEN NOT MATCHED AND s.opType = 'insert' THEN INSERT (key, value) VALUES (key, value)
  • Merge Into Kudu。
    MERGE INTO target_table t
    USING source_table s
    WHEN s.opType = 'delete' THEN DELETE
    WHEN s.opType = 'update' THEN UPDATE SET *
    WHEN s.opType = 'insert' THEN INSERT *

说明

  • 可以有1~3个WHEN子句,最多2个WHEN MATCHED子句,最多1个WHEN NOT MATCHED子句。WHEN NOT MATCHED子句存在时必须是最后一条子句。
  • WHEN MATCHED子句。
    • 最多有一个UPDATE或者DELETE操作。
    • 每个子句可以有一个可选的condition,但如果有两个WHEN MATCHED子句,则第一个WHEN MATCHED必须写上condition。
    • WHEN MATCHED子句有顺序关系,且每条数据只能执行一个WHEN MATCHED子句。
    • 支持UPDATE SET *写法,表示UPDATE SET column1 = source.column1 [, column2 = source.column2 ...]。如果源表和目标表的字段个数不一致,则抛出解析失败异常。
  • WHEN NOT MATCHED子句。
    • 此子句只能使用INSERT操作,且可以有一个可选的condition。
    • 支持INSERT *写法,表示INSERT SET (column1 [, column2 ...]) VALUES (source.value1 [, source.value2 ...])。如果源表和目标表的字段个数不一致,则抛出解析失败异常。
  • 对于源表中没有匹配上任何规则的数据行,则不做任何操作。
  • 对于一些支持UPSERT操作的目标存储系统,可以基于Merge的变种进行实现。
    • mergeCondition:针对支持UPSERT操作的目标存储,则无需配置Merge条件。
    • 省略MATCHEDNOT MATCHED关键字,直接根据自动以condition进行操作,但依旧要求只能有1~3个WHEN子句。