Create data lake tables

更新时间:
复制 MD 格式

AnalyticDB for MySQL is deeply integrated with open-source lake table formats such as Apache Iceberg. Powered by a self-developed high-performance XIHE engine and a managed Spark engine, it provides open, multi-engine compatible data lake (lakehouse) capabilities. Data in lake tables is persisted to object storage (Alibaba Cloud OSS) in the standard Parquet format. Any compute engine that supports Iceberg, such as Spark, Flink, or Trino, can directly read the data. This helps you avoid vendor lock-in and ensures the long-term availability of your data assets. This topic uses the Apache Iceberg and Delta Lake formats as examples to describe how to create data lake tables.

Storage modes

Mode selection

Before you create a data lake table, you need to decide where to store the data. AnalyticDB for MySQL provides two flexible storage management modes that balance control and convenience, and meet different security and O&M requirements:

  • Customer-managed OSS bucket

    Data is stored entirely in a customer-specified Alibaba Cloud OSS bucket within the same region. This approach meets strict compliance and data sovereignty requirements. You must explicitly declare the storage path when creating databases and tables for fine-grained control.

  • AnalyticDB for MySQL-managed lake storage

    AnalyticDB for MySQL automatically manages the underlying storage buckets, which are not visible in your account. You can use standard SQL to seamlessly read from and write to lake tables without having to manage file systems, permission configurations, or lifecycle management. This greatly lowers the barrier to entry. For more information, see lake storage.

Key advantages

All metadata of lake tables, such as databases, table structures, column definitions, and partition information, is centrally managed by the AnalyticDB for MySQL built-in catalog service. You do not need to deploy, scale, or maintain a separate metadata cluster.

Working with an open data lake is as simple as using a traditional database:

  • You only need to focus on CREATE TABLE and SQL query logic.

  • The platform fully manages the underlying infrastructure, including storage, metadata, file formats, and compression codecs.

  • Complete openness to the open-source ecosystem is maintained, delivering the ideal lakehouse experience: "as simple as a database, as open as a data lake."

Prerequisites

  • To create Apache Iceberg tables by using the XIHE engine, your cluster must be on minor engine version 3.2.7 or later.

    Note

    To view and update the minor version, go to the Configuration Information section on the Cluster Information page in the AnalyticDB for MySQL console. To upgrade a cluster that is already on the latest default baseline version, contact Alibaba Cloud Service Support on DingTalk (DingTalk ID: x5v_rm8wqzuqf).

  • To create a table in the managed lake storage of AnalyticDB for MySQL, you need to submit a ticket to contact technical support to enable the lake storage feature and create a new lake storage.

Default table format

You can set a default table format at the database level. This ensures that new tables created in the database automatically use the specified format without an explicit declaration in each CREATE TABLE statement.

  1. In a Job resource group, Interactive resource group, or a submitted job, set the following parameter:

    spark.sql.adb.sources.extractProviderFromDBProperties.enabled true
  2. When you create a database, use DBPROPERTIES to specify 'storage.format' as one of the following formats: delta, iceberg, parquet, or orc. The following is an example:

    CREATE DATABASE IF NOT EXISTS db_storage_format 
    LOCATION 'oss://path/to/db/' 
    WITH DBPROPERTIES ('storage.format'='delta');

    After you execute the preceding statement, tables that you create in the db_storage_format database are of the delta type by default. If you explicitly specify the table type by using using ${tableFormat} when you create a table, the explicitly specified table type takes precedence.

Create Apache Iceberg tables

In customer-managed OSS buckets

Non-partitioned tables

Use case: Ideal for small dimension tables such as countries, regions, or product categories, small static datasets that are frequently scanned, or data that does not require pruning by time or high-cardinality fields.

  • XIHE SQL

    CREATE DATABASE db_iceberg;
    
    -- Create a non-partitioned table named nation.
    -- This is a small dimension table, typically with fewer than 100 rows.
    CREATE TABLE db_iceberg.nation (
        n_nationkey INT,
        n_name      STRING,
        n_regionkey INT,
        n_comment   STRING
    )
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
  • Spark SQL

    CREATE DATABASE db_iceberg;
    
    CREATE TABLE db_iceberg.nation (
        n_nationkey INT,
        n_name      STRING,
        n_regionkey INT,
        n_comment   STRING
    )
    USING iceberg
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

Partitioned tables

AnalyticDB for MySQL supports defining partitions by using partition transformation functions. Partitioned tables are a core practice for building high-performance, scalable, and easy-to-manage modern data lakes (lakehouses). AnalyticDB for MySQL supports the following Apache Iceberg partition transformation rules:

Transformation function

Syntax example

Description

Type

identity

PARTITIONED BY (id)

Identity partitioning, equivalent to Hive-style partitioning.

All types, but not recommended for high-cardinality fields.

year

PARTITIONED BY (years(ts))

Partition by year.

Timestamp, Date

month

PARTITIONED BY (months(ts))

Partition by month.

Timestamp, Date

day

PARTITIONED BY (days(ts))

Partition by day. This is the most common approach.

Timestamp, Date

hour

PARTITIONED BY (hours(ts))

Partition by hour.

Timestamp

bucket[N]

PARTITIONED BY (bucket(16, user_id))

Hash bucketing, where N is the number of buckets.

All types. This function is often used for high-cardinality fields such as IDs.

truncate[len]

PARTITIONED BY (truncate(10, email))

Truncate to the first len characters of a string.

String

The following examples show how to use the XIHE engine and Spark SQL to apply partitioning strategies for various use cases.

Categorical fields

  • XIHE

    -- Partition by c_mktsegment (market segment).
    -- This field has only five unique values such as 'AUTOMOBILE', 'BUILDING', and 'FURNITURE', and is ideal for identity partitioning.
    CREATE TABLE db_iceberg.customer (
        c_custkey     BIGINT,
        c_name        STRING,
        c_address     STRING,
        c_nationkey   INT,
        c_phone       STRING,
        c_acctbal     DECIMAL(15,2),
        c_mktsegment  STRING,
        c_comment     STRING
    )
    PARTITIONED BY (c_mktsegment)
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
    
  • Spark SQL

    CREATE TABLE db_iceberg.customer (
        c_custkey     BIGINT,
        c_name        STRING,
        c_address     STRING,
        c_nationkey   INT,
        c_phone       STRING,
        c_acctbal     DECIMAL(15,2),
        c_mktsegment  STRING,
        c_comment     STRING
    )
    USING iceberg
    PARTITIONED BY (c_mktsegment)
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

Partition by time dimension (years/months/days) in tiers

  • XIHE

    -- Partition by day on the order date. This is the most common approach and balances query performance with management overhead.
    CREATE TABLE db_iceberg.orders (
        o_orderkey      BIGINT,
        o_custkey       BIGINT,
        o_orderstatus   STRING,
        o_totalprice    DECIMAL(15,2),
        o_orderdate     DATE,        -- Native TPC-H field
        o_orderpriority STRING,
        o_clerk         STRING,
        o_shippriority  INT,
        o_comment       STRING
    )
    PARTITIONED BY (day(o_orderdate))  -- Recommended for balancing partition count and pruning efficiency.
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
  • Spark SQL

    CREATE TABLE db_iceberg.orders (
        o_orderkey      BIGINT,
        o_custkey       BIGINT,
        o_orderstatus   STRING,
        o_totalprice    DECIMAL(15,2),
        o_orderdate     DATE,
        o_orderpriority STRING,
        o_clerk         STRING,
        o_shippriority  INT,
        o_comment       STRING
    )
    USING iceberg -- Note: Iceberg in Spark uses the days() transformation function and does not require an explicitly created column.
    PARTITIONED BY (days(o_orderdate))
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

Hourly partitioning

  • XIHE

    -- Assume l_receiptdate is extended to a TIMESTAMP type to simulate a real-time receipt timestamp.
    CREATE TABLE db_iceberg.lineitem_realtime (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15,2),
        l_extendedprice DECIMAL(15,2),
        l_discount      DECIMAL(15,2),
        l_tax           DECIMAL(15,2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receipttime   TIMESTAMP,   -- Simulated: Receipt time accurate to the second.
        l_shipmode      STRING
    )
    PARTITIONED BY (hour(l_receipttime))  -- Partition by hour to support near-real-time monitoring.
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
    
  • Spark SQL

    CREATE TABLE db_iceberg.lineitem_realtime (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15,2),
        l_extendedprice DECIMAL(15,2),
        l_discount      DECIMAL(15,2),
        l_tax           DECIMAL(15,2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receipttime   TIMESTAMP,
        l_shipmode      STRING
    )
    USING iceberg -- Iceberg feature: Use the hours() transformation function for hidden partitioning without adding new columns.
    PARTITIONED BY (hours(l_receipttime))
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

Hash bucketing

  • XIHE

    -- Apply hash bucketing to the high-cardinality foreign key l_partkey to prevent small files and improve JOIN performance.
    CREATE TABLE db_iceberg.lineitem (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,      -- High cardinality (approx. 20 million unique values)
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15,2),
        l_extendedprice DECIMAL(15,2),
        l_discount      DECIMAL(15,2),
        l_tax           DECIMAL(15,2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receiptdate   DATE,
        l_shipmode      STRING
    )
    PARTITIONED BY (bucket(l_partkey,64))  -- Distribute data evenly into 64 buckets.
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
  • Spark SQL

    CREATE TABLE db_iceberg.lineitem (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,      -- High Cardinality
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15,2),
        l_extendedprice DECIMAL(15,2),
        l_discount      DECIMAL(15,2),
        l_tax           DECIMAL(15,2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receiptdate   DATE,
        l_shipmode      STRING
    )
    USING iceberg
    -- Iceberg syntax: bucket(number_of_buckets, column_name)
    -- This generates an implicit partition column that distributes data into 64 logical buckets based on a hash value.
    PARTITIONED BY (bucket(64, l_partkey))
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

truncate[len] String prefix truncation partitioning

  • XIHE

    -- Partition by the country code prefix of a phone number (e.g., '13-' represents a carrier in China).
    CREATE TABLE db_iceberg.customer_by_phone (
        c_custkey     BIGINT,
        c_name        STRING,
        c_phone       STRING,        -- Format: '13-888-999-1234'
        c_acctbal     DECIMAL(15,2),
        c_mktsegment  STRING
    )
    PARTITIONED BY (truncate(c_phone,3))  -- Truncate to the first 3 characters: '13-'.
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
  • Spark SQL

    CREATE TABLE db_iceberg.customer_by_phone (
        c_custkey     BIGINT,
        c_name        STRING,
        c_phone       STRING,
        c_acctbal     DECIMAL(15,2),
        c_mktsegment  STRING
    )
    USING iceberg
    -- Iceberg natively supports the truncate(width, column_name) transformation.
    -- When data is written, Iceberg automatically calculates the first 3 characters of c_phone and stores the data in the corresponding directory.
    PARTITIONED BY (truncate(3, c_phone))
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

Composite partitioning

Multi-level partitioning balances partition count to avoid over-partitioning while ensuring even data distribution to prevent data skew. This strategy is ideal for querying massive datasets.

The following example shows how to create a generic log table:

  • XIHE

    -- User event log table: A three-level composite partition strategy that uses time, high-cardinality ID bucketing, and region truncation.
    CREATE TABLE db_iceberg.user_event_log (
        event_id      BIGINT,
        user_id       BIGINT,          -- High-cardinality user ID
        session_id    STRING,
        event_type    STRING,
        event_time    TIMESTAMP,       -- Timestamp accurate to the second
        country_code  STRING,          -- Country code, such as 'CN', 'US', or 'DE'
        device_type   STRING,
        payload       STRING
    )
    PARTITIONED BY (
        day(event_time),            -- Level 1: Partition by day for efficient time-based pruning.
        bucket(user_id,64),         -- Level 2: Apply hash bucketing with 64 buckets to the high-cardinality user_id to prevent small files.
        truncate(country_code,2)    -- Level 3: Truncate to the first 2 characters of the country code for regional aggregation.
    )
    STORED AS ICEBERG
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
    -- TPC-H lineitem table: An optimized fact table partitioned by ship date and bucketed by part ID.
    CREATE TABLE db_iceberg.lineitem_multiple_part (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,        -- High-cardinality foreign key (approx. 20 million unique values)
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15, 2),
        l_extendedprice DECIMAL(15, 2),
        l_discount      DECIMAL(15, 2),
        l_tax           DECIMAL(15, 2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,          -- Core time field in TPC-H
        l_commitdate    DATE,
        l_receiptdate   DATE,
        l_shipinstruct  STRING,
        l_ship
  • Spark SQL

    CREATE TABLE db_iceberg.user_event_log (
        event_id      BIGINT,
        user_id       BIGINT,
        session_id    STRING,
        event_type    STRING,
        event_time    TIMESTAMP,
        country_code  STRING,
        device_type   STRING,
        payload       STRING
    )
    USING iceberg
    PARTITIONED BY (
        days(event_time),           -- Automatic transformation: partitions by day
        bucket(64, user_id),        -- Automatic hashing: hashes user_id into 64 buckets
        truncate(2, country_code)   -- Automatic truncation: truncates to the first 2 characters
    )
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';
    CREATE TABLE db_iceberg.lineitem_multiple_part (
        l_orderkey      BIGINT,
        l_partkey       BIGINT,
        l_suppkey       BIGINT,
        l_linenumber    INT,
        l_quantity      DECIMAL(15, 2),
        l_extendedprice DECIMAL(15, 2),
        l_discount      DECIMAL(15, 2),
        l_tax           DECIMAL(15, 2),
        l_returnflag    STRING,
        l_linestatus    STRING,
        l_shipdate      DATE,
        l_commitdate    DATE,
        l_receiptdate   DATE,
        l_shipinstruct  STRING,
        l_shipmode      STRING,
        l_comment       STRING
    )
    USING iceberg
    PARTITIONED BY (
        days(l_shipdate),      -- Partition by day.
        bucket(32, l_partkey)  -- Hash into 32 buckets. Note: The first parameter is the number of buckets.
    )
    LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/SUBPATH/';

Create a table in managed lake storage in AnalyticDB for MySQL

  1. Create an external database associated with the managed lake storage.

    • XIHE

      CREATE EXTERNAL DATABASE test_db 
      WITH DBPROPERTIES ('adb_lake_bucket' = '<YOUR_ADB_BUCKET>');
    • Spark SQL

      CREATE DATABASE test_db 
      WITH DBPROPERTIES ('adb_lake_bucket' = '<YOUR_ADB_BUCKET>');
  2. Create a table in the database. Use TBLPROPERTIES to specify the managed data lake bucket where the table is stored.

    Note

    The partitioning strategy is the same as that for tables created in your own Object Storage Service (OSS) bucket.

    • XIHE

      CREATE TABLE test_db.test_iceberg_tbl (
        `id` int,
        `name` string
      ) 
      STORED AS ICEBERG
      TBLPROPERTIES (
        'catalog_type' = 'ADB', 
        'adb_lake_bucket' = '<YOUR_ADB_BUCKET>'
      );
    • Spark SQL

      Job resource group

      SET spark.adb.lakehouse.enabled=true;       -- Enable lake storage.               
      
      CREATE TABLE test_db.test_iceberg_tbl (
        `id` int,
        `name` string
      ) 
      USING iceberg
      TBLPROPERTIES ( 'adb_lake_bucket' = '<YOUR_ADB_BUCKET>' );

      Interactive resource group

      1. Enable lake storage. Modify the resource group and add the Spark configuration spark.adb.lakehouse.enabled. Set the value to true.

      2. Run the SQL statement.

        CREATE TABLE test_db.test_iceberg_tbl (
          `id` int,
          `name` string
        ) 
        USING iceberg
        TBLPROPERTIES ( 'adb_lake_bucket' = '<YOUR_ADB_BUCKET>' );

Create a Delta Lake table

Currently, only Spark SQL and PySpark support creating, reading, and writing Delta Lake tables. The XIHE engine does not support these operations.

The following is an example. For more information about the syntax, see How to Create Delta Lake Tables | Delta Lake.

CREATE DATABASE db_delta LOCATION 'oss://<YOUR_BUCKET>/db_delta/';

CREATE TABLE IF NOT EXISTS db_delta.delta_lake_comprehensive_test (
    transaction_id BIGINT NOT NULL COMMENT 'Globally unique transaction ID',
    user_id STRING COMMENT 'User ID',
    device_info STRUCT<
        os: STRING, 
        model: STRING, 
        app_version: STRING
    > COMMENT 'Nested structure for device details',
    tags MAP<STRING, STRING> COMMENT 'Map of user tags',
    item_list ARRAY<STRING> COMMENT 'List of purchased items',
    event_ts TIMESTAMP COMMENT 'Time when the event occurred',
    revenue DECIMAL(18, 2) COMMENT 'Revenue amount',
    event_date DATE COMMENT 'Automatically generated date partition key'
)
USING DELTA
PARTITIONED BY (event_date);