本文为您介绍Hudi与Spark SQl集成后,支持的DML语句。

前提条件

已创建Hadoop集群,详情请参见创建集群

使用限制

EMR-3.36.0及后续版本和EMR-5.2.0及后续版本,支持Spark SQL对Hudi进行读写操作。

进入spark-sql命令行

  1. 使用SSH方式登录到集群,详情信息请参见登录集群
  2. 执行以下命令,进入spark-sql命令行。
    spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    当返回信息中包含如下信息时,表示已进入spark-sql命令行。
    spark-sql>

MERGE INTO

表示执行插入、更新或删除操作。

  • 语法
    MERGE INTO tableIdentifier AS target_alias
    USING (sub_query | tableIdentifier) AS source_alias
    ON <merge_condition>
     WHEN MATCHED [ AND <condition> ] THEN <matched_action> 
    [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
    [ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]
    
    <merge_condition> =A equal bool condition 
    <matched_action>  =
      DELETE  |
      UPDATE SET *  |
      UPDATE SET column1 = value1 [, column2 = value2 ...]
    <not_matched_action>  =
      INSERT *  |
      INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
  • 示例
    -- without delete
    merge into h0 as target
    using (
      select 1 as id, 'a1' as name, 10.0 as price
    ) source
    on target.id = source.id
    when matched then update set *
    when not matched then insert *;
    -- with delete
    merge into h0 as target
    using (
      select 1 as id, 'a1' as name, 10.0 as price
    ) source
    on target.id = source.id
    when matched then update set id = source.id, name = source.name, price = source.price
    when matched and name = 'delete' then delete
    when not matched then insert (id,name,price) values(id, name, price);

INSERT INTO

表示向分区表或非分区表插入数据。

代码示例如下所示:
  • 向非分区表h0中插入数据。
    insert into h0 select 1, 'a1', 20;
  • 向静态分区表h_p0中插入数据。
    insert into h_p0 partition(dt='2021-01-02') select 1, 'a1';
  • 向动态分区表h_p0中插入数据。
    insert into h_p0 partition(dt) select 1, 'a1', dt from s;
  • 向动态分区(分区字段放在select表达式最后面)h_p1表中插入数据。
    insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;
  • 重写表h0中的数据。
    insert overwrite table h0 select 1, 'a1', 20;

UPDATE语句

表示更新分区表或非分区表中行对应的单列或多列数据。

  • 语法
    UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION);
  • 示例:将表h0id为1的price字段值更新为20。
    update h0 set price=20 where id=1;

DELETE语句

表示删除分区表或非分区表中满足指定条件的单行或多行数据。

  • 语法
    DELETE FROM tableIdentifier [WHERE BOOL_EXPRESSION];
  • 示例:删除表h0id大于100的数据。
    delete from h0 where id>100;