本文为您介绍E-MapReduce中DeltaLake的配置信息及其常用命令的示例。
DeltaLake配置信息
EMR中DeltaLake的默认配置信息如下:
- Spark 2.X环境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
- Spark 3.X环境
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
常用命令
- 创建表
CREATE TABLE delta_table (id INT) USING delta;
- 插入数据
INSERT INTO delta_table VALUES 0,1,2,3,4;
- 覆盖写数据
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
- 查询数据
SELECT * FROM delta_table;
- 更新数据
给偶数id加100。
UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
- 删除数据
删除偶数id的记录。
DELETE FROM delta_table WHERE mod(id, 2) = 0;
- Merge
- 创建Source表用于Merge操作。
CREATE TABLE newData(id INT) USING delta;
- 向表中插入数据。
INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
- 使用newData作为Source Merge进delta_table表,如果匹配到相同id的记录,id加100,没有匹配直接插入;
MERGE INTO delta_table AS target USING newData AS source ON target.id = source.id WHEN MATCHED THEN UPDATE SET target.id = source.id + 100 WHEN NOT MATCHED THEN INSERT *;
- 创建Source表用于Merge操作。
- 流式读数据
- 创建流式目标表。
CREATE TABLE stream_debug_table (id INT);
- 创建流。
CREATE SCAN stream_delta_table on delta_table USING STREAM;
说明 本文示例中的delta_table为您已存在的delta表。 - 流式写入目标表。
CREATE STREAM job options ( triggerType='ProcessingTime', checkpointLocation = '/tmp/streaming_read_cp' ) INSERT INTO stream_debug_table SELECT * FROM stream_delta_table;
- 创建流式目标表。
- 流式写数据
- 创建Kafka的管道表。
CREATE TABLE IF NOT EXISTS kafka_topic USING kafka OPTIONS ( kafka.bootstrap.servers = "${BOOTSTRAP_SERVERS}", subscribe = "${TOPIC_NAME}" );
说明 上述代码中的kafka.bootstrap.servers
为Kafka集群中任一Kafka Broker组件的内网IP地址和端口。subscribe
为Topic名称。 - 创建流。
CREATE SCAN stream_kafka_topic on kafka_topic USING STREAM;
- 流式写入delta表。
CREATE STREAM job OPTIONS( checkpointLocation='/tmp/' ) INSERT INTO delta_table SELECT CAST(value AS STRING) AS id FROM stream_kafka_topic;
- 创建Kafka的管道表。