本文为您介绍E-MapReduce中DeltaLake的配置信息及其常用命令的示例。
DeltaLake配置信息
EMR中DeltaLake的默认配置信息如下:
- Spark 2.X环境 - spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
- Spark 3.X环境 - spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
常用命令示例
- 创建表 - CREATE TABLE delta_table (id INT) USING delta;
- 插入数据 - INSERT INTO delta_table VALUES 0,1,2,3,4;
- 覆盖写数据 - INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
- 查询数据 - SELECT * FROM delta_table;
- 更新数据 - 给偶数 - id加100。- UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
- 删除数据 - 删除偶数 - id的记录。- DELETE FROM delta_table WHERE mod(id, 2) = 0;
Merge操作
- 创建Source表用于Merge操作。 - CREATE TABLE newData(id INT) USING delta;
- 向表中插入数据。 - INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
- 使用newData作为Source Merge进delta_table表。如果匹配到相同 - id的记录,则该- id加100;如果没有匹配到,则直接插入新记录。- MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;
- 流式读数据 - 创建流式目标表。 - CREATE TABLE stream_debug_table (id INT) USING delta;
- 创建流。 - CREATE SCAN stream_delta_table on delta_table USING STREAM;说明- 本文示例中的delta_table为您已存在的delta表。 
- 流式写入目标表。 - CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
 
流式代码示例
- 通过SSH方式连接集群,详情请参见登录集群。 
- 执行以下命令,启动streaming-sql。 - streaming-sql说明- 如果您已添加DeltaLake组件,则可以直接执行 - streaming-sql命令。如果集群内没有默认配置,您可以通过以下配置来使用Delta Lake。- streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
- 执行以下命令,创建流式目标表。 - CREATE TABLE stream_debug_table (id INT) USING DELTA;
- 执行以下命令,创建流。 - CREATE SCAN stream_delta_table on delta_table USING STREAM;
- 执行以下命令,以delta_table作为Source,流式写入目标表。 - CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
- 您可以新开启一个streaming-sql客户端,向Source中插入新数据,并查询目标表的数据。 - 执行以下命令,验证Source存量写入。 - SELECT * FROM stream_debug_table;
- 执行以下命令,插入新数据。 - INSERT INTO delta_table VALUES 801, 802;
- 执行以下命令,查询插入的数据。 - SELECT * FROM stream_debug_table;
- 执行以下命令,插入新数据。 - INSERT INTO delta_table VALUES 901, 902;
- 执行以下命令,查询插入的数据。 - SELECT * FROM stream_debug_table;
 
该文章对您有帮助吗?