创建数据湖表

更新时间:
复制为 MD 格式

AnalyticDB for MySQL深度集成Apache Iceberg开源湖表格式,基于自研高性能XIHE引擎以及托管Spark引擎,提供开放、多引擎兼容的数据湖(Lakehouse)能力。用户创建的湖表数据以标准Parquet 格式持久化至对象存储(阿里云OSS),任何支持Iceberg的计算引擎(如 Spark、Flink、Trino)均可直接读取,避免厂商锁定,保障数据资产长期可用性。本文以Apache IcebergDelta Lake格式为例,介绍数据湖表的创建方法。

存储托管模式

模式选择

在创建数据湖表前,您需要决定数据的存储位置。AnalyticDB for MySQL提供两种灵活的存储管理模式,兼顾控制权与便捷性满足不同安全与运维需求:

  • 用户自有OSS Bucket

    数据完全存放在用户指定的同地域OSS Bucket中,满足强合规与数据主权要求。建库、建表时需显式声明存储路径,实现细粒度管控。

  • AnalyticDB for MySQL托管湖存储

    数据由AnalyticDB for MySQL自动管理底层存储桶,该存储桶在用户账号下不可见。用户通过标准SQL无缝读写湖表,无需关心文件系统、权限配置或生命周期管理,极大降低使用门槛。详情请参见湖存储

核心优势

所有湖表的元数据(包括数据库、表结构、列定义、分区信息等)均由AnalyticDB for MySQL内置Catalog服务统一管理,无需用户部署、扩缩容或维护独立的元数据集群。

使用开放数据湖如同操作传统数据库一样简单

  • 用户只需关注CREATE TABLESQL查询逻辑。

  • 底层存储、元数据、文件格式、压缩编码等基础设施由平台全托管。

  • 保留对开源生态的完全开放性,实现 “简单如数据库,开放如数据湖” 的理想Lakehouse体验。

前提条件

  • 通过XIHE引擎创建Apache Iceberg表,集群内核版本需为3.2.7及以上版本。

    说明

    云原生数据仓库AnalyticDB MySQL控制台集群信息页面,配置信息区域,查看和升级内核版本。若集群已是最新默认基线版本但仍需升级,请通过钉钉联系阿里云服务支持处理(钉钉账号:x5v_rm8wqzuqf)。

  • AnalyticDB for MySQL托管湖存储中创建表,需提交工单联系技术支持开通湖存储功能,并新建湖存储

创建Apache Iceberg

在用户自有OSS bucket中创建表

创建非分区表

适用场景:小型维度表(如国家、地区、产品类别等)、全表扫描频繁但数据量小、无需按时间或高基数字段裁剪的静态数据。

  • XIHE SQL

    CREATE DATABASE db_iceberg;
    
    -- 创建非分区的 nation 表(小维表,通常 < 100 行)
    CREATE TABLE db_iceberg.nation (
        n_nationkey INT,
        n_name      STRING,
        n_regionkey INT,
        n_comment   STRING
    )
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
  • Spark SQL

    CREATE DATABASE db_iceberg;
    
    CREATE TABLE db_iceberg.nation (
        n_nationkey INT,
        n_name      STRING,
        n_regionkey INT,
        n_comment   STRING
    )
    USING iceberg
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

创建分区表

AnalyticDB for MySQL支持使用分区转换函数定义分区。分区表(Partitioned Tables) 是构建高性能、可扩展、易管理的现代数据湖(Lakehouse)的核心实践之一。AnalyticDB for MySQL支持的Apache Iceberg分区转换规则如下:

转换函数

语法示例

说明

适用类型

identity

PARTITIONED BY (id)

原值分区(等同于Hive分区)

所有类型(但不推荐用于高基数字段)

year

PARTITIONED BY (years(ts))

按年分区

Timestamp, Date

month

PARTITIONED BY (months(ts))

按月分区

Timestamp, Date

day

PARTITIONED BY (days(ts))

按天分区(最常用)

Timestamp, Date

hour

PARTITIONED BY (hours(ts))

按小时分区

Timestamp

bucket[N]

PARTITIONED BY (bucket(16, user_id))

哈希分桶(N 为桶数)

所有类型(常用于高基数字段如 ID)

truncate[len]

PARTITIONED BY (truncate(10, email))

截断字符串前 len 位

String

XIHE、Spark引擎为例,不同场景适合的分区策略,示例如下:

基于低基数分类字段进行分区

  • XIHE

    -- 按市场细分(mktsegment)原值分区,该字段仅5个枚举值,适合 identity
    CREATE TABLE db_iceberg.customer (
        c_custkey     BIGINT,
        c_name        STRING,
        c_address     STRING,
        c_nationkey   INT,
        c_phone       STRING,
        c_acctbal     DECIMAL(15,2),
        c_mktsegment  STRING,   -- 低基数:'AUTOMOBILE', 'BUILDING', 'FURNITURE' 等
        c_comment     STRING
    )
    PARTITIONED BY (c_mktsegment)
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
    
  • Spark SQL

    CREATE TABLE db_iceberg.customer (
        c_custkey     BIGINT,
        c_name        STRING,
        c_address     STRING,
        c_nationkey   INT,
        c_phone       STRING,
        c_acctbal     DECIMAL(15,2),
        c_mktsegment  STRING,
        c_comment     STRING
    )
    USING iceberg
    PARTITIONED BY (c_mktsegment)
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

时间维度(years/months/days)分层分区

  • XIHE

    -- 按订单日期按天分区(最常用),兼顾查询性能与管理成本
    CREATE TABLE db_iceberg.orders (
        o_orderkey      BIGINT,
        o_custkey       BIGINT,
        o_orderstatus   STRING,
        o_totalprice    DECIMAL(15,2),
        o_orderdate     DATE,        -- TPC-H 原生字段
        o_orderpriority STRING,
        o_clerk         STRING,
        o_shippriority  INT,
        o_comment       STRING
    )
    PARTITIONED BY (day(o_orderdate))  -- 推荐:平衡分区数量与裁剪效率
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
  • Spark SQL

    CREATE TABLE db_iceberg.orders (
        o_orderkey      BIGINT,
        o_custkey       BIGINT,
        o_orderstatus   STRING,
        o_totalprice    DECIMAL(15,2),
        o_orderdate     DATE,
        o_orderpriority STRING,
        o_clerk         STRING,
        o_shippriority  INT,
        o_comment       STRING
    )
    USING iceberg -- 注意:Iceberg 在 Spark 中使用的是 days() 转换函数,且不需要显式创建新列
    PARTITIONED BY (days(o_orderdate))
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

高频事件按小时分区

  • XIHE

    -- 假设 l_receiptdate 扩展为 TIMESTAMP(模拟收货时间戳)
    CREATE TABLE db_iceberg.lineitem_realtime (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15,2),
        l_extendedprice DECIMAL(15,2),
        l_discount      DECIMAL(15,2),
        l_tax           DECIMAL(15,2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receipttime   TIMESTAMP,   -- 模拟:精确到秒的收货时间
        l_shipmode      STRING
    )
    PARTITIONED BY (hour(l_receipttime))  -- 按小时分区,支持近实时监控
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
    
  • Spark SQL

    CREATE TABLE db_iceberg.lineitem_realtime (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15,2),
        l_extendedprice DECIMAL(15,2),
        l_discount      DECIMAL(15,2),
        l_tax           DECIMAL(15,2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receipttime   TIMESTAMP,
        l_shipmode      STRING
    )
    USING iceberg -- Iceberg 特性:使用 hours() 转换函数进行隐藏分区,无需新增列
    PARTITIONED BY (hours(l_receipttime))
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

基于高基数字段哈希分桶

  • XIHE

    -- 对高基数外键 l_partkey 哈希分桶,避免小文件 & 提升 JOIN 性能
    CREATE TABLE db_iceberg.lineitem (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,      -- 高基数(约 2000 万唯一值)
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15,2),
        l_extendedprice DECIMAL(15,2),
        l_discount      DECIMAL(15,2),
        l_tax           DECIMAL(15,2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receiptdate   DATE,
        l_shipmode      STRING
    )
    PARTITIONED BY (bucket(l_partkey,64))  -- 分64桶,均匀分布
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
  • Spark SQL

    CREATE TABLE db_iceberg.lineitem (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,      -- High Cardinality
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15,2),
        l_extendedprice DECIMAL(15,2),
        l_discount      DECIMAL(15,2),
        l_tax           DECIMAL(15,2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receiptdate   DATE,
        l_shipmode      STRING
    )
    USING iceberg
    -- Iceberg 语法:bucket(桶数, 列名)
    -- 这会生成一个隐式分区列,根据哈希值将数据分散到 64 个逻辑桶中
    PARTITIONED BY (bucket(64, l_partkey))
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

truncate[len]字符串前缀截断分区

  • XIHE

    -- 按电话号码国家代码前缀分区(如 '13-' 代表中国某运营商)
    CREATE TABLE db_iceberg.customer_by_phone (
        c_custkey     BIGINT,
        c_name        STRING,
        c_phone       STRING,        -- 格式:'13-888-999-1234'
        c_acctbal     DECIMAL(15,2),
        c_mktsegment  STRING
    )
    PARTITIONED BY (truncate(c_phone,3))  -- 截取前3字符 '13-'
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
  • Spark SQL

    CREATE TABLE db_iceberg.customer_by_phone (
        c_custkey     BIGINT,
        c_name        STRING,
        c_phone       STRING,
        c_acctbal     DECIMAL(15,2),
        c_mktsegment  STRING
    )
    USING iceberg
    -- Iceberg 原生支持 truncate(宽度, 列名) 转换
    -- 数据写入时,Iceberg 会自动计算 c_phone 的前3位并据此存入对应目录
    PARTITIONED BY (truncate(3, c_phone))
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

多级复合分区

两级分区在分区数量(避免过细)与数据均匀性(避免倾斜)之间取得平衡,适用于海量数据的查询场景。

以通用日志表为例,创建语句如下:

  • XIHE

    -- 用户行为日志表:时间 + 高基数ID分桶 + 区域截断,三层复合分区
    CREATE TABLE db_iceberg.user_event_log (
        event_id      BIGINT,
        user_id       BIGINT,          -- 高基数用户ID
        session_id    STRING,
        event_type    STRING,
        event_time    TIMESTAMP,       -- 精确到秒的时间戳
        country_code  STRING,          -- 国家代码,如 'CN', 'US', 'DE'
        device_type   STRING,
        payload       STRING
    )
    PARTITIONED BY (
        day(event_time),            -- 第一级:按天分区(高效时间裁剪)
        bucket(user_id,64),         -- 第二级:对高基数 user_id 哈希分64桶(防小文件)
        truncate(country_code,2)    -- 第三级:截取国家代码前2位(区域聚合)
    )
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
    -- TPC-H lineitem 表:按发货日期天分区 + 零件ID分桶(经典事实表优化)
    CREATE TABLE db_iceberg.lineitem_multiple_part (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,        -- 高基数外键(~20M 唯一值)
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15, 2),
        l_extendedprice DECIMAL(15, 2),
        l_discount      DECIMAL(15, 2),
        l_tax           DECIMAL(15, 2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,          -- TPC-H 核心时间字段
        l_commitdate    DATE,
        l_receiptdate   DATE,
        l_shipinstruct  STRING,
        l_shipmode      STRING,
        l_comment       STRING
    )
    PARTITIONED BY (
        day(l_shipdate),            -- 第一级:按发货日期分区(TPC-H 查询高频过滤条件)
        bucket(l_partkey,32)        -- 第二级:对零件ID哈希分32桶(提升 JOIN 和点查性能)
    )
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
  • Spark SQL

    CREATE TABLE db_iceberg.user_event_log (
        event_id      BIGINT,
        user_id       BIGINT,
        session_id    STRING,
        event_type    STRING,
        event_time    TIMESTAMP,
        country_code  STRING,
        device_type   STRING,
        payload       STRING
    )
    USING iceberg
    PARTITIONED BY (
        days(event_time),           -- 自动转换:按天分区
        bucket(64, user_id),        -- 自动哈希:对 user_id 分64个桶
        truncate(2, country_code)   -- 自动截取:前2位字符
    )
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
    CREATE TABLE db_iceberg.lineitem_multiple_part (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15, 2),
        l_extendedprice DECIMAL(15, 2),
        l_discount      DECIMAL(15, 2),
        l_tax           DECIMAL(15, 2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receiptdate   DATE,
        l_shipinstruct  STRING,
        l_shipmode      STRING,
        l_comment       STRING
    )
    USING iceberg
    PARTITIONED BY (
        days(l_shipdate),      -- 显式指定按天分区
        bucket(32, l_partkey)  -- 哈希分32桶 (注意参数顺序:桶数在前)
    )
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

AnalyticDB for MySQL托管湖存储中创建表

  1. 创建一个关联到托管存储的外部库。

    • XIHE

      CREATE EXTERNAL DATABASE test_db 
      WITH DBPROPERTIES ('adb_lake_bucket' = '<YOUR_ADB_BUCKET>');
    • Spark SQL

      CREATE DATABASE test_db 
      WITH DBPROPERTIES ('adb_lake_bucket' = '<YOUR_ADB_BUCKET>');
  2. 在该数据库下创建表,并通过TBLPROPERTIES定义表所位于的托管数据湖bucket。

    说明

    分区策略与在用户自有OSS bucket中创建一致

    • XIHE

      CREATE TABLE test_db.test_iceberg_tbl (
        `id` int,
        `name` string
      ) 
      STORED AS ICEBERG
      TBLPROPERTIES (
        'catalog_type' = 'ADB', 
        'adb_lake_bucket' = '<YOUR_ADB_BUCKET>'
      );
    • Spark SQL

      Job型资源组

      SET spark.adb.lakehouse.enabled=true;       -- 开启湖存储               
      
      CREATE TABLE test_db.test_iceberg_tbl (
        `id` int,
        `name` string
      ) 
      USING iceberg
      TBLPROPERTIES ( 'adb_lake_bucket' = '<YOUR_ADB_BUCKET>' );

      Interactive型资源组

      1. 开启湖存储。修改资源组,添加Spark配置spark.adb.lakehouse.enabled,值为true。

      2. 执行SQL。

        CREATE TABLE test_db.test_iceberg_tbl (
          `id` int,
          `name` string
        ) 
        USING iceberg
        TBLPROPERTIES ( 'adb_lake_bucket' = '<YOUR_ADB_BUCKET>' );

创建Delta Lake

目前仅支持通过Spark SQL、PySpark创建和读写Delta Lake表,不支持通过XIHE引擎创建和读写。

创建示例如下,语法说明及更多信息,请参见How to Create Delta Lake Tables | Delta Lake

CREATE DATABASE db_delta LOCATION 'oss://<YOUR_BUCKET>/db_delta/';

CREATE TABLE IF NOT EXISTS db_delta.delta_lake_comprehensive_test (
    transaction_id BIGINT NOT NULL COMMENT '全局唯一交易ID',
    user_id STRING COMMENT '用户ID',
    device_info STRUCT<
        os: STRING, 
        model: STRING, 
        app_version: STRING
    > COMMENT '设备详情嵌套结构',
    tags MAP<STRING, STRING> COMMENT '用户标签Map',
    item_list ARRAY<STRING> COMMENT '购买商品列表',
    event_ts TIMESTAMP COMMENT '事件发生时间',
    revenue DECIMAL(18, 2) COMMENT '营收金额',
    event_date DATE COMMENT '自动生成的日期分区键'
)
USING DELTA
PARTITIONED BY (event_date);