通过Spark SQL读写Hudi外表

更新时间:

Apache Hudi(简称Hudi)是基于OSS对象存储的一种表格式,支持UPDATE、DELETEINSERT操作。云原生数据仓库 AnalyticDB MySQL 版Hudi做了深度整合,您可以通过Spark SQL读写Hudi外表。本文主要介绍如何通过Spark SQL读写Hudi外表。

前提条件

  • 集群的产品系列为企业版、基础版或湖仓版

    说明
    • 湖仓版集群存储预留资源需大于0 ACU。

    • 企业版集群预留资源需大于0 ACU。

    • 基础版集群预留资源需大于0 ACU。

  • 已在企业版、基础版或湖仓版集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建企业版、基础版或湖仓版集群的数据库账号。

步骤一:进入数据开发

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

  2. 在左侧导航栏,单击作业开发 > SQL开发

  3. SQLConsole窗口,选择Spark引擎和Job型资源组。

步骤二:创建外库与Hudi外表

说明

您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

  1. 执行以下语句,创建数据库。如果已有数据库,可跳过本步骤。

    CREATE DATABASE adb_external_db_hudi
    location 'oss://<bucket_name>/test/';    /*用于在该路径中创建表,请替换为自己的OSS路径。*/
  2. 执行以下语句,创建Hudi外表。

    CREATE TABLE adb_external_db_hudi.test_hudi_tbl (
     `id` int,
     `name` string,
     `age` int
    ) using hudi 
    tblproperties
     (primaryKey = 'id', 
      preCombineField = 'age') 
      partitioned by (age) 
    location 'oss://<bucket_name>/test/table/';  /*写入的数据会存储在该路径中,请替换为自己的OSS路径。*/
    重要
    • OSS路径中的Bucket需与创建数据库所选的Bucket相同。

    • 创建外表时选择的OSS路径需比创建数据库时选择的OSS路径至少多一层目录,且外表的路径需在数据库路径下。

    • 建表时必须定义primaryKey主键。preCombineField预聚合字段为可选,如未定义preCombineField,则在UPDATE场景会报错。

步骤三:写入Hudi外表数据

说明

您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

INSERT

执行以下语句,写入数据。您可以选择以下任意一种方式向Hudi外表中写入数据。

  • 方式一:INSERT INTO写入

    INSERT INTO adb_external_db_hudi.test_hudi_tbl values(1, 'lisa', 10),(2, 'jams', 10);
  • 方式二:INSERT OVERWRITE全表写入

    INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl values (1, 'lisa', 10), (2, 'jams', 20);
  • 方式三:INSERT OVERWRITE静态分区写入

    INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl partition(age=10) values(1, 'anna');
  • 方式四:INSERT OVERWRITE动态分区写入

    INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl partition (age) values (1, 'bom', 10);

UPDATE

执行以下语句更新数据,本文以将id=2name列更新为box为例。

UPDATE adb_external_db_hudi.test_hudi_tbl SET name = 'box' where id = 2;

DELETE

执行以下语句删除数据,本文以删除id列为1的数据为例。

DELETE FROM adb_external_db_hudi.test_hudi_tbl where id = 1;

并发控制

Hudi外表基于LockProvider的并发控制机制,避免执行DML操作时出现并发冲突。多个任务可以同时写入不同的数据范围,为了避免写入冲突,需要确保没有重叠的数据范围写入,从而保证了数据的正确性和一致性。您需要配置以下参数来启用并发控制。详细Hudi的并发控制机制请参见Apache Hudi

说明

若使用开源Hudi JAR包,暂不支持MdsBasedLockProvider实现并发控制。

set hoodie.cleaner.policy.failed.writes=LAZY;
set hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL;
set hoodie.write.lock.provider=org.apache.hudi.sync.adb.MdsBasedLockProvider;

参数说明:

参数

参数值

是否必填

描述

hoodie.cleaner.policy.failed.writes

LAZY

指定写入失败时的脏数据清理策略。

取值为LAZY,表示写入提交前不清理未完成的提交,失败的提交待心跳过期后由Clean操作统一清理,适用多个并发写入的场景。

hoodie.write.concurrency.mode

OPTIMISTIC_CONCURRENCY_CONTROL

写入操作的并发模式。取值为

OPTIMISTIC_CONCURRENCY_CONTROL,表示一个Hudi外表如果同时有多个写入任务,每个写入任务提交前都会检查是否存在提交冲突,冲突情况下本次写入失败。

hoodie.write.lock.provider

org.apache.hudi.sync.adb.MdsBasedLockProvider

锁提供程序类名,用户可以提供自己的LockProvider实现,提供的类必须是org.apache.hudi.common.lock.LockProvider的子类。

步骤四:查询数据

说明
  • 您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式

  • 执行Spark SQL语句,只返回执行成功或者失败,不返回数据。您可以在Spark Jar开发页面应用列表页签中的日志查看表数据。详情请参见查看Spark应用信息

执行以下语句,查询Hudi外表数据。

SELECT * FROM adb_external_db_hudi.test_hudi_tbl;