创建数据湖表
AnalyticDB for MySQL深度集成Apache Iceberg等开源湖表格式,基于自研高性能XIHE引擎以及托管Spark引擎,提供开放、多引擎兼容的数据湖(Lakehouse)能力。用户创建的湖表数据以标准Parquet 格式持久化至对象存储(阿里云OSS),任何支持Iceberg的计算引擎(如 Spark、Flink、Trino)均可直接读取,避免厂商锁定,保障数据资产长期可用性。本文以Apache Iceberg和Delta Lake格式为例,介绍数据湖表的创建方法。
存储托管模式
模式选择
在创建数据湖表前,您需要决定数据的存储位置。AnalyticDB for MySQL提供两种灵活的存储管理模式,兼顾控制权与便捷性,满足不同安全与运维需求:
用户自有OSS Bucket
数据完全存放在用户指定的同地域OSS Bucket中,满足强合规与数据主权要求。建库、建表时需显式声明存储路径,实现细粒度管控。
AnalyticDB for MySQL托管湖存储
数据由AnalyticDB for MySQL自动管理底层存储桶,该存储桶在用户账号下不可见。用户通过标准SQL无缝读写湖表,无需关心文件系统、权限配置或生命周期管理,极大降低使用门槛。详情请参见湖存储。
核心优势
所有湖表的元数据(包括数据库、表结构、列定义、分区信息等)均由AnalyticDB for MySQL内置Catalog服务统一管理,无需用户部署、扩缩容或维护独立的元数据集群。
使用开放数据湖如同操作传统数据库一样简单:
用户只需关注
CREATE TABLE和SQL查询逻辑。底层存储、元数据、文件格式、压缩编码等基础设施由平台全托管。
保留对开源生态的完全开放性,实现 “简单如数据库,开放如数据湖” 的理想Lakehouse体验。
前提条件
通过XIHE引擎创建Apache Iceberg表,集群内核版本需为3.2.7及以上版本。
说明请在云原生数据仓库AnalyticDB MySQL控制台集群信息页面,配置信息区域,查看和升级内核版本。若集群已是最新默认基线版本但仍需升级,请通过钉钉联系阿里云服务支持处理(钉钉账号:
x5v_rm8wqzuqf)。在AnalyticDB for MySQL托管湖存储中创建表,需提交工单联系技术支持开通湖存储功能,并新建湖存储。
创建Apache Iceberg表
在用户自有OSS bucket中创建表
创建非分区表
适用场景:小型维度表(如国家、地区、产品类别等)、全表扫描频繁但数据量小、无需按时间或高基数字段裁剪的静态数据。
XIHE SQL
CREATE DATABASE db_iceberg; -- 创建非分区的 nation 表(小维表,通常 < 100 行) CREATE TABLE db_iceberg.nation ( n_nationkey INT, n_name STRING, n_regionkey INT, n_comment STRING ) STORED AS ICEBERG LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';Spark SQL
CREATE DATABASE db_iceberg; CREATE TABLE db_iceberg.nation ( n_nationkey INT, n_name STRING, n_regionkey INT, n_comment STRING ) USING iceberg LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
创建分区表
AnalyticDB for MySQL支持使用分区转换函数定义分区。分区表(Partitioned Tables) 是构建高性能、可扩展、易管理的现代数据湖(Lakehouse)的核心实践之一。AnalyticDB for MySQL支持的Apache Iceberg分区转换规则如下:
转换函数 | 语法示例 | 说明 | 适用类型 |
|
| 原值分区(等同于Hive分区) | 所有类型(但不推荐用于高基数字段) |
|
| 按年分区 | Timestamp, Date |
|
| 按月分区 | Timestamp, Date |
|
| 按天分区(最常用) | Timestamp, Date |
|
| 按小时分区 | Timestamp |
|
| 哈希分桶(N 为桶数) | 所有类型(常用于高基数字段如 ID) |
|
| 截断字符串前 len 位 | String |
以XIHE、Spark引擎为例,不同场景适合的分区策略,示例如下:
基于低基数分类字段进行分区
XIHE
-- 按市场细分(mktsegment)原值分区,该字段仅5个枚举值,适合 identity CREATE TABLE db_iceberg.customer ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15,2), c_mktsegment STRING, -- 低基数:'AUTOMOBILE', 'BUILDING', 'FURNITURE' 等 c_comment STRING ) PARTITIONED BY (c_mktsegment) STORED AS ICEBERG LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';Spark SQL
CREATE TABLE db_iceberg.customer ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15,2), c_mktsegment STRING, c_comment STRING ) USING iceberg PARTITIONED BY (c_mktsegment) LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
按时间维度(years/months/days)分层分区
XIHE
-- 按订单日期按天分区(最常用),兼顾查询性能与管理成本 CREATE TABLE db_iceberg.orders ( o_orderkey BIGINT, o_custkey BIGINT, o_orderstatus STRING, o_totalprice DECIMAL(15,2), o_orderdate DATE, -- TPC-H 原生字段 o_orderpriority STRING, o_clerk STRING, o_shippriority INT, o_comment STRING ) PARTITIONED BY (day(o_orderdate)) -- 推荐:平衡分区数量与裁剪效率 STORED AS ICEBERG LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';Spark SQL
CREATE TABLE db_iceberg.orders ( o_orderkey BIGINT, o_custkey BIGINT, o_orderstatus STRING, o_totalprice DECIMAL(15,2), o_orderdate DATE, o_orderpriority STRING, o_clerk STRING, o_shippriority INT, o_comment STRING ) USING iceberg -- 注意:Iceberg 在 Spark 中使用的是 days() 转换函数,且不需要显式创建新列 PARTITIONED BY (days(o_orderdate)) LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
高频事件按小时分区
XIHE
-- 假设 l_receiptdate 扩展为 TIMESTAMP(模拟收货时间戳) CREATE TABLE db_iceberg.lineitem_realtime ( l_orderkey BIGINT, l_partkey BIGINT, l_suppkey BIGINT, l_linenumber INT, l_quantity DECIMAL(15,2), l_extendedprice DECIMAL(15,2), l_discount DECIMAL(15,2), l_tax DECIMAL(15,2), l_returnflag STRING, l_linestatus STRING, l_shipdate DATE, l_commitdate DATE, l_receipttime TIMESTAMP, -- 模拟:精确到秒的收货时间 l_shipmode STRING ) PARTITIONED BY (hour(l_receipttime)) -- 按小时分区,支持近实时监控 STORED AS ICEBERG LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';Spark SQL
CREATE TABLE db_iceberg.lineitem_realtime ( l_orderkey BIGINT, l_partkey BIGINT, l_suppkey BIGINT, l_linenumber INT, l_quantity DECIMAL(15,2), l_extendedprice DECIMAL(15,2), l_discount DECIMAL(15,2), l_tax DECIMAL(15,2), l_returnflag STRING, l_linestatus STRING, l_shipdate DATE, l_commitdate DATE, l_receipttime TIMESTAMP, l_shipmode STRING ) USING iceberg -- Iceberg 特性:使用 hours() 转换函数进行隐藏分区,无需新增列 PARTITIONED BY (hours(l_receipttime)) LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
基于高基数字段哈希分桶
XIHE
-- 对高基数外键 l_partkey 哈希分桶,避免小文件 & 提升 JOIN 性能 CREATE TABLE db_iceberg.lineitem ( l_orderkey BIGINT, l_partkey BIGINT, -- 高基数(约 2000 万唯一值) l_suppkey BIGINT, l_linenumber INT, l_quantity DECIMAL(15,2), l_extendedprice DECIMAL(15,2), l_discount DECIMAL(15,2), l_tax DECIMAL(15,2), l_returnflag STRING, l_linestatus STRING, l_shipdate DATE, l_commitdate DATE, l_receiptdate DATE, l_shipmode STRING ) PARTITIONED BY (bucket(l_partkey,64)) -- 分64桶,均匀分布 STORED AS ICEBERG LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';Spark SQL
CREATE TABLE db_iceberg.lineitem ( l_orderkey BIGINT, l_partkey BIGINT, -- High Cardinality l_suppkey BIGINT, l_linenumber INT, l_quantity DECIMAL(15,2), l_extendedprice DECIMAL(15,2), l_discount DECIMAL(15,2), l_tax DECIMAL(15,2), l_returnflag STRING, l_linestatus STRING, l_shipdate DATE, l_commitdate DATE, l_receiptdate DATE, l_shipmode STRING ) USING iceberg -- Iceberg 语法:bucket(桶数, 列名) -- 这会生成一个隐式分区列,根据哈希值将数据分散到 64 个逻辑桶中 PARTITIONED BY (bucket(64, l_partkey)) LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
truncate[len]字符串前缀截断分区
XIHE
-- 按电话号码国家代码前缀分区(如 '13-' 代表中国某运营商) CREATE TABLE db_iceberg.customer_by_phone ( c_custkey BIGINT, c_name STRING, c_phone STRING, -- 格式:'13-888-999-1234' c_acctbal DECIMAL(15,2), c_mktsegment STRING ) PARTITIONED BY (truncate(c_phone,3)) -- 截取前3字符 '13-' STORED AS ICEBERG LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';Spark SQL
CREATE TABLE db_iceberg.customer_by_phone ( c_custkey BIGINT, c_name STRING, c_phone STRING, c_acctbal DECIMAL(15,2), c_mktsegment STRING ) USING iceberg -- Iceberg 原生支持 truncate(宽度, 列名) 转换 -- 数据写入时,Iceberg 会自动计算 c_phone 的前3位并据此存入对应目录 PARTITIONED BY (truncate(3, c_phone)) LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
多级复合分区
两级分区在分区数量(避免过细)与数据均匀性(避免倾斜)之间取得平衡,适用于海量数据的查询场景。
以通用日志表为例,创建语句如下:
XIHE
-- 用户行为日志表:时间 + 高基数ID分桶 + 区域截断,三层复合分区 CREATE TABLE db_iceberg.user_event_log ( event_id BIGINT, user_id BIGINT, -- 高基数用户ID session_id STRING, event_type STRING, event_time TIMESTAMP, -- 精确到秒的时间戳 country_code STRING, -- 国家代码,如 'CN', 'US', 'DE' device_type STRING, payload STRING ) PARTITIONED BY ( day(event_time), -- 第一级:按天分区(高效时间裁剪) bucket(user_id,64), -- 第二级:对高基数 user_id 哈希分64桶(防小文件) truncate(country_code,2) -- 第三级:截取国家代码前2位(区域聚合) ) STORED AS ICEBERG LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';-- TPC-H lineitem 表:按发货日期天分区 + 零件ID分桶(经典事实表优化) CREATE TABLE db_iceberg.lineitem_multiple_part ( l_orderkey BIGINT, l_partkey BIGINT, -- 高基数外键(~20M 唯一值) l_suppkey BIGINT, l_linenumber INT, l_quantity DECIMAL(15, 2), l_extendedprice DECIMAL(15, 2), l_discount DECIMAL(15, 2), l_tax DECIMAL(15, 2), l_returnflag STRING, l_linestatus STRING, l_shipdate DATE, -- TPC-H 核心时间字段 l_commitdate DATE, l_receiptdate DATE, l_shipinstruct STRING, l_shipmode STRING, l_comment STRING ) PARTITIONED BY ( day(l_shipdate), -- 第一级:按发货日期分区(TPC-H 查询高频过滤条件) bucket(l_partkey,32) -- 第二级:对零件ID哈希分32桶(提升 JOIN 和点查性能) ) STORED AS ICEBERG LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';Spark SQL
CREATE TABLE db_iceberg.user_event_log ( event_id BIGINT, user_id BIGINT, session_id STRING, event_type STRING, event_time TIMESTAMP, country_code STRING, device_type STRING, payload STRING ) USING iceberg PARTITIONED BY ( days(event_time), -- 自动转换:按天分区 bucket(64, user_id), -- 自动哈希:对 user_id 分64个桶 truncate(2, country_code) -- 自动截取:前2位字符 ) LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';CREATE TABLE db_iceberg.lineitem_multiple_part ( l_orderkey BIGINT, l_partkey BIGINT, l_suppkey BIGINT, l_linenumber INT, l_quantity DECIMAL(15, 2), l_extendedprice DECIMAL(15, 2), l_discount DECIMAL(15, 2), l_tax DECIMAL(15, 2), l_returnflag STRING, l_linestatus STRING, l_shipdate DATE, l_commitdate DATE, l_receiptdate DATE, l_shipinstruct STRING, l_shipmode STRING, l_comment STRING ) USING iceberg PARTITIONED BY ( days(l_shipdate), -- 显式指定按天分区 bucket(32, l_partkey) -- 哈希分32桶 (注意参数顺序:桶数在前) ) LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
在AnalyticDB for MySQL中托管湖存储中创建表
创建一个关联到托管存储的外部库。
XIHE
CREATE EXTERNAL DATABASE test_db WITH DBPROPERTIES ('adb_lake_bucket' = '<YOUR_ADB_BUCKET>');Spark SQL
CREATE DATABASE test_db WITH DBPROPERTIES ('adb_lake_bucket' = '<YOUR_ADB_BUCKET>');
在该数据库下创建表,并通过
TBLPROPERTIES定义表所位于的托管数据湖bucket。说明分区策略与在用户自有OSS bucket中创建一致。
XIHE
CREATE TABLE test_db.test_iceberg_tbl ( `id` int, `name` string ) STORED AS ICEBERG TBLPROPERTIES ( 'catalog_type' = 'ADB', 'adb_lake_bucket' = '<YOUR_ADB_BUCKET>' );Spark SQL
Job型资源组
SET spark.adb.lakehouse.enabled=true; -- 开启湖存储 CREATE TABLE test_db.test_iceberg_tbl ( `id` int, `name` string ) USING iceberg TBLPROPERTIES ( 'adb_lake_bucket' = '<YOUR_ADB_BUCKET>' );Interactive型资源组
开启湖存储。修改资源组,添加Spark配置
spark.adb.lakehouse.enabled,值为true。执行SQL。
CREATE TABLE test_db.test_iceberg_tbl ( `id` int, `name` string ) USING iceberg TBLPROPERTIES ( 'adb_lake_bucket' = '<YOUR_ADB_BUCKET>' );
创建Delta Lake表
目前仅支持通过Spark SQL、PySpark创建和读写Delta Lake表,不支持通过XIHE引擎创建和读写。
创建示例如下,语法说明及更多信息,请参见How to Create Delta Lake Tables | Delta Lake。
CREATE DATABASE db_delta LOCATION 'oss://<YOUR_BUCKET>/db_delta/';
CREATE TABLE IF NOT EXISTS db_delta.delta_lake_comprehensive_test (
transaction_id BIGINT NOT NULL COMMENT '全局唯一交易ID',
user_id STRING COMMENT '用户ID',
device_info STRUCT<
os: STRING,
model: STRING,
app_version: STRING
> COMMENT '设备详情嵌套结构',
tags MAP<STRING, STRING> COMMENT '用户标签Map',
item_list ARRAY<STRING> COMMENT '购买商品列表',
event_ts TIMESTAMP COMMENT '事件发生时间',
revenue DECIMAL(18, 2) COMMENT '营收金额',
event_date DATE COMMENT '自动生成的日期分区键'
)
USING DELTA
PARTITIONED BY (event_date);