基础使用

本文为您介绍E-MapReduce中DeltaLake的配置信息及其常用命令的示例。

DeltaLake配置信息

EMR中DeltaLake的默认配置信息如下:

  • Spark 2.X环境

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
  • Spark 3.X环境

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

常用命令示例

  • 创建表

    CREATE TABLE delta_table (id INT) USING delta;
  • 插入数据

    INSERT INTO delta_table VALUES 0,1,2,3,4;
  • 覆盖写数据

    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  • 查询数据

    SELECT * FROM delta_table;
  • 更新数据

    给偶数id加100。

    UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
  • 删除数据

    删除偶数id的记录。

    DELETE FROM delta_table WHERE mod(id, 2) = 0;

Merge操作

  1. 创建Source表用于Merge操作。

    CREATE TABLE newData(id INT) USING delta;
  2. 向表中插入数据。

    INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
  3. 使用newData作为Source Merge进delta_table表。如果匹配到相同id的记录,则该id加100;如果没有匹配到,则直接插入新记录。

    MERGE INTO delta_table AS target
      USING newData AS source
      ON target.id = source.id
      WHEN MATCHED THEN UPDATE SET target.id = source.id + 100
      WHEN NOT MATCHED THEN INSERT *;
  4. 流式读数据

    1. 创建流式目标表。

      CREATE TABLE stream_debug_table (id INT) USING delta;
    2. 创建流。

      CREATE SCAN stream_delta_table on delta_table USING STREAM;
      说明

      本文示例中的delta_table为您已存在的delta表。

    3. 流式写入目标表。

      CREATE STREAM job
      options (
        triggerType='ProcessingTime',
        checkpointLocation = '/tmp/streaming_read_cp'
      )
      INSERT INTO stream_debug_table
      SELECT * FROM stream_delta_table;

流式代码示例

  1. 通过SSH方式连接集群,详情请参见登录集群

  2. 执行以下命令,启动streaming-sql。

    streaming-sql
    说明

    如果您已添加DeltaLake组件,则可以直接执行streaming-sql命令。如果集群内没有默认配置,您可以通过以下配置来使用Delta Lake。

    streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
  3. 执行以下命令,创建流式目标表。

    CREATE TABLE stream_debug_table (id INT) USING DELTA;
  4. 执行以下命令,创建流。

    CREATE SCAN stream_delta_table on delta_table USING STREAM;
  5. 执行以下命令,以delta_table作为Source,流式写入目标表。

    CREATE STREAM job
    options (
      triggerType='ProcessingTime',
      checkpointLocation = '/tmp/streaming_read_cp'
    )
    INSERT INTO stream_debug_table
    SELECT * FROM stream_delta_table;
  6. 您可以新开启一个streaming-sql客户端,向Source中插入新数据,并查询目标表的数据。

    1. 执行以下命令,验证Source存量写入。

      SELECT * FROM stream_debug_table;
    2. 执行以下命令,插入新数据。

      INSERT INTO delta_table VALUES 801, 802;
    3. 执行以下命令,查询插入的数据。

      SELECT * FROM stream_debug_table;
    4. 执行以下命令,插入新数据。

      INSERT INTO delta_table VALUES 901, 902;
    5. 执行以下命令,查询插入的数据。

      SELECT * FROM stream_debug_table;