本文为您介绍Hudi与Spark SQl集成后,支持的建表语句。
背景信息
Spark SQL创建Hudi表时,可以通过options设置表配置信息,options参数如下表所示。
0.10版本之后options被替换为tblproperties。
参数 | 描述 | 是否必选 |
primaryKey | 指定主键列,多个主键时使用逗号(,)隔开。 | 必选 |
type | 表类型,支持以下两种类型:
| 可选 |
preCombineField | 版本字段。 对应Hudi的DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY字段。 | 建议设置,否则upsert场景无法支持 |
payloadClass | 默认值为DefaultHoodieRecordPayload。 对应Hudi的DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY字段。 | 可选 |
前提条件
已创建包含Spark和Hudi服务的集群,详情请参见创建集群。
使用限制
EMR-3.36.0及后续版本和EMR-5.2.0及后续版本,支持Spark SQL对Hudi进行读写操作。
启动方式
- Spark2和Spark3 hudi0.11以下版本
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
- Spark3 hudi0.11及以上版本
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
创建非分区表
options通过primaryKey指定主键列,多个字段时使用逗号(,)隔开。创建非分区表的示例如下所示:
创建表类型为cow,主键为id的非分区表。
create table if not exists h0( id bigint, name string, price double ) using hudi options ( type = 'cow', primaryKey = 'id' );
创建表类型为mor,主键为id和name的非分区表。
create table if not exists h0( id bigint, name string, price double ) using hudi options ( type = 'mor', primaryKey = 'id,name' );
创建表类型为cow的非分区表。
create table if not exists h0( id bigint, name string, price double ) using hudi options ( type = 'cow' );
创建分区表
创建分区表的示例如下所示。
create table if not exists h_p0 (
id bigint,
name string,
dt string,
hh string
) using hudi
location 'oss://xxx/h_p0'
options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'id'
)
partitioned by (dt, hh);
本文代码示例中的location为表所在的路径,可以是OSS路径,也可以是HDFS路径。主键为id,分区字段为dt和hh,版本字段为id。
创建外表
支持在已经存在的Hudi表之上创建外表。创建外表示例如下所示。
create table h0
using hudi
location '/xx/xx/h0';
CTAS语法
通过以下示例为您介绍如何使用CTAS语法。
示例1:
create table if not exists h1 using hudi as select 1 as id, 'a1' as name, 10 as price;
示例2:
create table if not exists h2 using hudi partitioned by (dt) location '/xx/xx/h2' options ( type = 'mor', primaryKey = 'id,name' ) as select 1 as id, 'a1' as name, 20 as price, '2021-01-03' as dt;