本文为您介绍E-MapReduce中DeltaLake的配置信息及其常用命令的示例。

DeltaLake配置信息

EMR中DeltaLake的默认配置信息如下:
  • Spark 2.X环境
    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
  • Spark 3.X环境
    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

常用命令

  • 创建表
    CREATE TABLE delta_table (id INT) USING delta;
  • 插入数据
    INSERT INTO delta_table VALUES 0,1,2,3,4;
  • 覆盖写数据
    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  • 查询数据
    SELECT * FROM delta_table;
  • 更新数据

    给偶数id加100。

    UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
  • 删除数据

    删除偶数id的记录。

    DELETE FROM delta_table WHERE mod(id, 2) = 0;
  • Merge
    1. 创建Source表用于Merge操作。
      CREATE TABLE newData(id INT) USING delta;
    2. 向表中插入数据。
      INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
    3. 使用newData作为Source Merge进delta_table表,如果匹配到相同id的记录,id加100,没有匹配直接插入;
      MERGE INTO delta_table AS target
        USING newData AS source
        ON target.id = source.id
        WHEN MATCHED THEN UPDATE SET target.id = source.id + 100
        WHEN NOT MATCHED THEN INSERT *;
  • 流式读数据
    1. 创建流式目标表。
      CREATE TABLE stream_debug_table (id INT);
    2. 创建流。
      CREATE SCAN stream_delta_table on delta_table USING STREAM;
      说明 本文示例中的delta_table为您已存在的delta表。
    3. 流式写入目标表。
      CREATE STREAM job
      options (
        triggerType='ProcessingTime',
        checkpointLocation = '/tmp/streaming_read_cp'
      )
      INSERT INTO stream_debug_table
      SELECT *
      FROM stream_delta_table;
  • 流式写数据
    1. 创建Kafka的管道表。
      CREATE TABLE IF NOT EXISTS kafka_topic
      USING kafka
      OPTIONS (
      kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}",
      subscribe = "${TOPIC_NAME}"
      );
      说明 上述代码中的kafka.bootstrap.servers为Kafka集群中任一Kafka Broker组件的内网IP地址和端口。subscribe 为Topic名称。
    2. 创建流。
      CREATE SCAN stream_kafka_topic on kafka_topic USING STREAM;
    3. 流式写入delta表。
      CREATE STREAM job
      OPTIONS(
      checkpointLocation='/tmp/'
      )
      INSERT INTO delta_table
      SELECT CAST(value AS STRING) AS id FROM stream_kafka_topic;

流式代码示例

  1. 通过SSH方式连接集群,详情请参见登录集群
  2. 执行以下命令,启动streaming-sql。
    streaming-sql
    说明 如果您已添加DeltaLake组件,则可以直接执行streaming-sql命令。如果集群内没有默认配置,您可以通过以下配置来使用Delta Lake。
    streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
  3. 执行以下命令,创建流式目标表。
    CREATE TABLE stream_debug_table (id INT) USING DELTA;
  4. 执行以下命令,创建流。
    CREATE SCAN stream_delta_table on delta_table USING STREAM;
  5. 执行以下命令,以delta_table作为Source,流式写入目标表。
    CREATE STREAM job
    options (
      triggerType='ProcessingTime',
      checkpointLocation = '/tmp/streaming_read_cp'
    )
    INSERT INTO stream_debug_table
    SELECT *
    FROM stream_delta_table;
  6. 您可以新开启一个streaming-sql客户端,向Source中插入新数据,并查询目标表的数据。
    1. 执行以下命令,验证Source存量写入。
      SELECT * FROM stream_debug_table;
    2. 执行以下命令,插入新数据。
      INSERT INTO delta_table VALUES 801, 802;
    3. 执行以下命令,查询插入的数据。
      SELECT * FROM stream_debug_table;
    4. 执行以下命令,插入新数据。
      INSERT INTO delta_table VALUES 901, 902;
    5. 执行以下命令,查询插入的数据。
      SELECT * FROM stream_debug_table;