通过Spark SQL读写Delta外表
Delta是一种可以基于OSS对象存储的数据湖表格式,支持UPDATE、DELETE和INSERT操作。云原生数据仓库 AnalyticDB MySQL 版和Delta表格式进行了整合,您可以通过Spark SQL读写Delta外表。本文主要介绍如何通过Spark SQL读写Delta外表。
前提条件
集群的产品系列为企业版、基础版或湖仓版。
已在企业版、基础版或湖仓版集群中创建Job型资源组。具体操作,请参见新建资源组。
已创建企业版、基础版或湖仓版集群的数据库账号。
注意事项
Xihe引擎不支持读写Delta表。
AnalyticDB for MySQL Spark侧负责集成Spark对应版本的Delta版本,并对内置Delta版本进行升级,不负责Delta内核问题排查,以及不同Delta版本之前兼容性问题。
读写Delta外表
AnalyticDB for MySQL内置Delta包可满足通过Spark SQL读写Delta外表数据。若内置Delta包版本(2.0.2)无法满足读写需求时,可自编译Delta包版本读写Delta外表数据。
使用AnalyticDB for MySQL内置Delta包
步骤一:进入数据开发
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在SQLConsole窗口,选择Spark引擎和Job型资源组。
步骤二:创建外库和Delta外表
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
执行以下语句,创建数据库。如果已有数据库,可跳过本步骤。
CREATE DATABASE if not exists external_delta_db location "oss://<bucket_name>/test/"; /*用于在该路径中创建表,请替换为自己的OSS路径。*/
执行以下语句,创建Delta外表。
CREATE TABLE if not exists external_delta_db.delta_test_tbl ( id int, name string, age int ) using delta partitioned by (age) location "oss://<bucket_name>/test/delta_test_tbl";
步骤三:写入Delta外表数据
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
INSERT
执行以下语句,写入数据。您可以选择以下任意一种方式向Delta外表中写入数据。
方式一:INSERT INTO写入
INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
方式二:INSERT OVERWRITE全表写入
INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
方式三:INSERT OVERWRITE静态分区写入
INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
方式四:INSERT OVERWRITE动态分区写入
INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);
UPDATE
执行以下语句更新数据,本文以将id=1的name列更新为box为例。
UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;
DELETE
执行以下语句删除数据,本文以删除id列为1的数据为例。
DELETE FROM external_delta_db.delta_test_tbl where id = 1;
步骤四:查询数据
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
执行Spark SQL语句,只返回执行成功或者失败,不返回数据。您可以在Spark Jar开发页面应用列表页签中的日志查看表数据。详情请参见查看Spark应用信息。
执行以下语句,查询Delta外表数据。
SELECT * FROM external_delta_db.delta_test_tbl;
使用自定义Delta包
需要使用和AnalyticDB for MySQL Spark内核版本(3.2.0)匹配的Delta版本,以免出现版本不兼容问题。
步骤一:进入数据开发
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在SQLConsole窗口,选择Spark引擎和Job型资源组。
步骤二:创建外库和Delta外表
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
执行以下语句,创建数据库。如果已有数据库,可跳过本步骤。
add jar oss://<bucket_name>/path/to/delta-core_xx.jar; /*自编译Delta包,需要手动上传至OSS中。*/ add jar oss://<bucket_name>/path/to/delta-storage-xx.jar; /*自编译Delta包,需要手动上传至OSS中。*/ SET spark.adb.connectors=oss; /*启用AnalyticDB MySQL版Spark内置的连接器OSS。*/ SET spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension; /*开源Spark参数。*/ SET spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog; /*开源Spark参数。*/ CREATE DATABASE if not exists external_delta_db location "oss://<bucket_name>/test/"; /*用于在该路径中创建表,请替换为自己的OSS路径。*/
执行以下语句,创建Delta外表。
CREATE TABLE if not exists external_delta_db.delta_test_tbl ( id int, name string, age int ) using delta partitioned by (age) location "oss://<bucket_name>/test/delta_test_tbl";
步骤三:写入Delta外表数据
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
INSERT
执行以下语句,写入数据。您可以选择以下任意一种方式向Delta外表中写入数据。
方式一:INSERT INTO写入
INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
方式二:INSERT OVERWRITE全表写入
INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
方式三:INSERT OVERWRITE静态分区写入
INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
方式四:INSERT OVERWRITE动态分区写入
INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);
UPDATE
执行以下语句更新数据,本文以将id=1的name列更新为box为例。
UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;
DELETE
执行以下语句删除数据,本文以删除id列为1的数据为例。
DELETE FROM external_delta_db.delta_test_tbl where id = 1;
步骤四:查询数据
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
执行Spark SQL语句,只返回执行成功或者失败,不返回数据。您可以在Spark Jar开发页面应用列表页签中的日志查看表数据。详情请参见查看Spark应用信息。
执行以下语句,查询Delta外表数据。
SELECT * FROM external_delta_db.delta_test_tbl;