通过Spark SQL读写Delta外表

更新时间:

Delta是一种可以基于OSS对象存储的数据湖表格式,支持UPDATE、DELETE和INSERT操作。云原生数据仓库 AnalyticDB MySQL 版和Delta表格式进行了整合,您可以通过Spark SQL读写Delta外表。本文主要介绍如何通过Spark SQL读写Delta外表。

前提条件

  • 集群的产品系列为企业版、基础版或湖仓版

  • 已在企业版、基础版或湖仓版集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建企业版、基础版或湖仓版集群的数据库账号。

注意事项

  • Xihe引擎不支持读写Delta表。

  • AnalyticDB for MySQL Spark侧负责集成Spark对应版本的Delta版本,并对内置Delta版本进行升级,不负责Delta内核问题排查,以及不同Delta版本之前兼容性问题。

读写Delta外表

AnalyticDB for MySQL内置Delta包可满足通过Spark SQL读写Delta外表数据。若内置Delta包版本(2.0.2)无法满足读写需求时,可自编译Delta包版本读写Delta外表数据。

使用AnalyticDB for MySQL内置Delta包

步骤一:进入数据开发

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > SQL开发

  3. SQLConsole窗口,选择Spark引擎和Job型资源组。

步骤二:创建外库和Delta外表

说明

您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

  1. 执行以下语句,创建数据库。如果已有数据库,可跳过本步骤。

    CREATE DATABASE if not exists external_delta_db
    location "oss://<bucket_name>/test/";      /*用于在该路径中创建表,请替换为自己的OSS路径。*/
  2. 执行以下语句,创建Delta外表。

    CREATE TABLE if not exists external_delta_db.delta_test_tbl (
      id int, 
      name string, 
      age int
    ) using delta 
    partitioned by (age) 
    location "oss://<bucket_name>/test/delta_test_tbl";

步骤三:写入Delta外表数据

说明

您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

INSERT

执行以下语句,写入数据。您可以选择以下任意一种方式向Delta外表中写入数据。

  • 方式一:INSERT INTO写入

    INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
  • 方式二:INSERT OVERWRITE全表写入

    INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
  • 方式三:INSERT OVERWRITE静态分区写入

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
  • 方式四:INSERT OVERWRITE动态分区写入

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);

UPDATE

执行以下语句更新数据,本文以将id=1的name列更新为box为例。

UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;

DELETE

执行以下语句删除数据,本文以删除id列为1的数据为例。

DELETE FROM external_delta_db.delta_test_tbl where id = 1;

步骤四:查询数据

说明
  • 您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

  • 执行Spark SQL语句,只返回执行成功或者失败,不返回数据。您可以在Spark Jar开发页面应用列表页签中的日志查看表数据。详情请参见查看Spark应用信息

执行以下语句,查询Delta外表数据。

SELECT * FROM external_delta_db.delta_test_tbl;

使用自定义Delta包

重要

需要使用和AnalyticDB for MySQL Spark内核版本(3.2.0)匹配的Delta版本,以免出现版本不兼容问题。

步骤一:进入数据开发

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > SQL开发

  3. SQLConsole窗口,选择Spark引擎和Job型资源组。

步骤二:创建外库和Delta外表

说明

您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

  1. 执行以下语句,创建数据库。如果已有数据库,可跳过本步骤。

    add jar oss://<bucket_name>/path/to/delta-core_xx.jar;    /*自编译Delta包,需要手动上传至OSS中。*/
    
    add jar oss://<bucket_name>/path/to/delta-storage-xx.jar; /*自编译Delta包,需要手动上传至OSS中。*/
    
    SET spark.adb.connectors=oss;   /*启用AnalyticDB MySQL版Spark内置的连接器OSS。*/ 
    
    SET spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension;  /*开源Spark参数。*/
    
    SET spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;   /*开源Spark参数。*/
    
    CREATE DATABASE if not exists external_delta_db
    location "oss://<bucket_name>/test/";          /*用于在该路径中创建表,请替换为自己的OSS路径。*/
  2. 执行以下语句,创建Delta外表。

    CREATE TABLE if not exists external_delta_db.delta_test_tbl (
      id int, 
      name string, 
      age int
    ) using delta 
    partitioned by (age) 
    location "oss://<bucket_name>/test/delta_test_tbl";

步骤三:写入Delta外表数据

说明

您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

INSERT

执行以下语句,写入数据。您可以选择以下任意一种方式向Delta外表中写入数据。

  • 方式一:INSERT INTO写入

    INSERT INTO external_delta_db.delta_test_tbl values(1, 'lisa', 10),(2, 'jams', 10);
  • 方式二:INSERT OVERWRITE全表写入

    INSERT OVERWRITE external_delta_db.delta_test_tbl values (2, 'zhangsan', 10), (4, 'lisi', 30);
  • 方式三:INSERT OVERWRITE静态分区写入

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition(age=17) values(3, 'anna');
  • 方式四:INSERT OVERWRITE动态分区写入

    INSERT OVERWRITE external_delta_db.delta_test_tbl partition (age) values (1, 'bom', 10);

UPDATE

执行以下语句更新数据,本文以将id=1的name列更新为box为例。

UPDATE external_delta_db.delta_test_tbl set name = 'box' where id = 1;

DELETE

执行以下语句删除数据,本文以删除id列为1的数据为例。

DELETE FROM external_delta_db.delta_test_tbl where id = 1;

步骤四:查询数据

说明
  • 您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

  • 执行Spark SQL语句,只返回执行成功或者失败,不返回数据。您可以在Spark Jar开发页面应用列表页签中的日志查看表数据。详情请参见查看Spark应用信息

执行以下语句,查询Delta外表数据。

SELECT * FROM external_delta_db.delta_test_tbl;