文档

Time travel查询与增量查询

更新时间:

对于Delta Table类型的表,MaxCompute支持查询回溯到源表某个历史时间或者版本进行历史Snapshot查询(Time travel查询),也支持指定源表某个历史时间区间或者版本区间进行历史增量查询(Incremental查询)。本文为您介绍Delta Table的查询使用说明和使用限制。

命令格式

[with <cte>[, ...] ]
select [all | distinct] <select_expr>[, <except_expr>)][, <replace_expr>] ...
       from <table_reference>
       [timestamp | version as of expr]
       [timestamp | version between start_expr and end_expr] 
       [where <where_condition>]
       [group by {<col_list>|rollup(<col_list>)}]
           [having <having_condition>]
       [order by <order_condition>]
       [distribute by <distribute_condition> [sort by <sort_condition>]|[ cluster by <cluster_condition>] ]
       [limit <number>]
       [window <window_clause>]
说明

SQL DQL语法基本支持了查询Delta Table的所有场景,并基本遵循MaxCompute DQL语法以及对应的约束。只增加了From Table 子句的语法增强,可以通过固定格式的表达式指定源表某个历史时间或者版本进行历史Snapshot查询,也可以指定源表某个历史时间区间或者版本区间进行历史增量查询。

Time travel查询参数与使用限制

Time travel查询Delta Table,回溯到源表某个历史时间或者版本进行历史Snapshot查询。您可通过[timestamp | version as of expr]来配置具体的查询方式。

timestamp as of expr

  • 参数说明

    • timestamp as of为固定语法格式,代表根据历史时间查询历史snapshot数据 。

    • expr的取值类型可为MaxCompute标准的timestmap、datetime或date类型,目前可支持的形式有:

      • 日期字符串常量:

        timestmap | datetime | date类型的字符串常量,示例如下。

        数据类型

        示例

        timestmap

        '2023-01-01 00:00:00.123'

        datetime

        '2023-01-01 00:00:00'

        date

        '2023-01-01'

      • MaxCompute内置时间函数:

        current_timestamp() | getDate() + N: current_timestamp() | getDate()

        其中N的单位是秒,N为负数,表示当前时间往前N秒,N为正数,表示当前时间往后N秒。

      • Delta Table特殊语法:

        get_latest_timestamp(string tablename [, bigint <number>])

        • 如果是跨project访问,tablename需写为projectName.tableName,

        • 如果是三层模型,tablename需写为projectName.schemaName.tableName;

        • number可不填,默认值为1,表示时间从后往前序列中第number次数据操作的Commit时间,比如1表示最后一次操作,其中数据操作包括用户侧主动发送的数据修改操作和系统内部发起的数据排布操作。不同的number参数返回的timestamp可能相同。

  • 使用限制

    • 查询的历史快照数据范围为[CreateTableTimestamp, expr],比较对象为每次DML操作生成的Commit时间,CreateTableTimestamp为表创建操作生成的Commit时间。

    • expr返回的时间早于time travel时间周期(即创建Delta Table时配置的acid.data.retain.hours),或早于Delta Table表创建时间,会直接报错,因为对应的历史数据状态可能不存在了,比如acid.data.retain.hours=72小时,exprs为72小时之前的时间,就会报错。

    • expr返回的时间如果正好处于time travel时间周期(即创建Delta Table时配置的acid.data.retain.hours)的下限,由于内部系统之间交互也有延时,所以有概率出现秒级的误差,导致报错,所以尽量不要使用类似于 (timestamp as of current_timestamp() - time travel时间周期) 的语法,容易触发报错。

version as of expr

  • 参数说明

    • version as of 为固定语法格式,代表根据历史数据操作的版本号(version)查询历史snapshot数据。

    • expr返回类型为MaxCompute的bigint整型,目前可支持的形式有:

      • 整型常量:

        比如常量3

      • Delta Table特殊语法:

        get_latest_version(string tablename [, bigint <number>])

        • 如果是跨project访问,tablename需写为projectName.tableName,

        • 如果是三层模型,tablename需写为projectName.schemaName.tableName;

        • number可不填,默认值为1,表示时间从后往前序列中第number次数据操作的版本号,比如1表示最后一次操作,其中数据操作包括用户侧主动发送的数据修改操作和系统内部发起的数据排布操作。不同的number参数返回的version不相同。

  • 使用限制

    • 每次DML操作会产生严格递增的version,您可通过show history for table / partition显示所有DML操作信息,从中获取对应操作的version。

    • 查询的历史快照数据范围为 [CreateTableVersion, expr],比较对象为每次DML操作对应的version。 CreateTableVersion为表创建操作产生的version,默认为1。

    • expr返回的version,系统会获取它对应的DML Commit时间,如果早于配置的time travel时间周期(即创建Delta Table时配置的acid.data.retain.hours),或者version值小于1, 会直接报错。

    • expr返回的version大于最近一次DML操作的version,直接报错,推荐通过get_latest_version函数来获取所需的版本号。

Incremental查询参数与使用限制

Incremental查询Delta Table,指定源表某个历史时间区间或者版本区间进行历史增量查询。您可通过[timestamp | version between start_expr and end_expr]来配置具体的查询方式。

timestamp between start_expr and end_expr

  • 参数说明

    • timestamp between and:为固定语法格式,代表根据历史时间区间查询历史增量数据。

    • start_exprend_expr用法以及约束同timestamp as of语法的expr保持一致。

  • 使用限制

    • 查询的历史增量数据范围为(start_expr,end_expr] , 即左开右闭区间,比较对象为每次DML操作生成的Commit时间。

    • start_expr早于配置的time travel时间周期(acid.data.retain.hours),或者早于表创建时间,会直接报错。

    • end_expr晚于最近一次DML操作的Commit时间时,查询行为结果根据表属性(acid.incremental.query.out.of.time.range.enabled)的取值来决定:

      • 设置为false(默认值)时,会直接报错

      • 设置为true时,会查询包含(start_expr,end_expr]范围内的所有增量历史数据。

        您可通过alter table修改此参数的取值,例如:

        alter table mf_tt2 set tblproperties("acid.incremental.query.out.of.time.range.enabled"="true");

version between start_expr and end_expr

  • 参数说明

    • version between and 为固定语法格式,代表根据历史DML操作的version区间查询历史增量数据。

    • start_expr和end_expr用法以及约束同version as of 语法的expr保持一致,参考上面描述。

  • 使用限制

    • 查询的历史增量数据范围是(start_expr,end_expr], 即左开右闭区间,比较对象为每次DML操作生成的version。

    • start_expr返回的version,系统会获取它对应的DML Commit时间,如果早于配置的time travel时间周期(acid.data.retain.hours), 或者version值小于1,会直接报错。

    • end_expr返回的version晚于最近一次DML操作的version的行为通过表属性(acid.incremental.query.out.of.time.range.enabled)来决定,默认值为false,会直接报错,如果设置为true,会查询包含(start_expr,end_expr]范围内的所有增量历史数据。

其他使用说明

  • 相同Key的多行记录,只返回最近的一行记录,如果最后一行是Delete状态,直接过滤掉。 后续版本考虑支持类似CDC格式的数据更新状态查询。

  • 不支持表对象不存在的表的历史数据查询,例如对表进行drop、rename等删除操作。

    此种情况下您可先通过restore操作恢复表对象,然后再继续查询。

  • 目前只支持Delta Table的time travel/incremental查询,其他表不支持。

  • 同一个SQL中的同一张表,对于time travel和incremental查询,timestamp或者version必须相同。

  • 分区表查询最好指定partition,避免查询所有partition的历史操作日志导致耗时不稳定。

  • 读写并发事务采用MVCC模型,可保障读写隔离,相互不影响,支持ReadCommited级别。

  • Compaction操作生成数据不认为是新增数据,因此增量查询出来的数据不会包含。

使用示例

  • 示例数据

    --创建表操作,产生的version为1,执行show history for table mf_tt2, 可查询version;
    create table mf_tt2 (pk bigint not null primary key, 
                      val bigint not null) 
                      partitioned by (dd string, hh string) 
                      tblproperties ("transactional"="true"); 
    --DML操作,产生的version为2
    insert overwrite table mf_tt2 partition(dd='01', hh='01') 
                     values (1, 1), (2, 2), (3, 3);
    --DML操作,产生的version为3
    insert into table mf_tt2 partition(dd='01', hh='01') 
                values (3, 30), (4, 4), (5, 5);
  • 表相关数据查询

    • 查询表创建时间,作为后续设置查询历史时间的参考

      desc extended mf_tt2;

      返回值:

      +------------------------------------------------------------------------------------+
      | Owner: ALIYUN$****_doctest@test.aliyunid.com | Project: doc_test_prod                               |
      | TableComment:                                                                      |
      +------------------------------------------------------------------------------------+
      | CreateTime:               2023-06-26 09:31:38                                      |
      | LastDDLTime:              2023-06-26 09:31:38                                      |
      | LastModifiedTime:         2023-06-26 09:32:31                                      |
      +------------------------------------------------------------------------------------+
      | InternalTable: YES      | Size: 8541                                               |
      +------------------------------------------------------------------------------------+
      | Native Columns:                                                                    |
      +------------------------------------------------------------------------------------+
      | Field    | Type   | Label | ExtendedLabel | Nullable | DefaultValue | Comment      |
      +------------------------------------------------------------------------------------+
      | pk       | bigint |       |               | false    | NULL         |              |
      | val      | bigint |       |               | false    | NULL         |              |
      +------------------------------------------------------------------------------------+
      | Partition Columns:                                                                 |
      +------------------------------------------------------------------------------------+
      | dd              | string     |                                                     |
      | hh              | string     |                                                     |
      +------------------------------------------------------------------------------------+
      | Extended Info:                                                                     |
      +------------------------------------------------------------------------------------+
      | TableID:                  bec515a56cc9492c8f906a224c62****                         |
      | IsArchived:               false                                                    |
      | PhysicalSize:             25623                                                    |
      | FileNum:                  9                                                        |
      | StoredAs:                 AliOrc                                                   |
      | CompressionStrategy:      normal                                                   |
      | ClusterType:              hash                                                     |
      | BucketNum:                16                                                       |
      | ClusterColumns:           [pk]                                                     |
      | SortColumns:              [pk ASC]                                                 |
      +------------------------------------------------------------------------------------+
    • 查询历史数据操作的版本号

      show history for table mf_tt2 partition(dd='01',hh='01');

      查询结果

      ID = 20230626021756157ghict5k****
      ObjectType      ObjectId                                ObjectName              VERSION(LSN)            Time                    Operation
      PARTITION       4764c8e1cb634a4fb9c21f3fc850****        dd=01/hh=01             0000000000000002        2023-06-26 09:31:56     CREATE
      PARTITION       4764c8e1cb634a4fb9c21f3fc850****        dd=01/hh=01             0000000000000003        2023-06-26 09:32:32     APPEND
  • Time travel查询示例

    • 查询截止到指定时间(例如datetime格式的字符串常量)的所有历史数据

      select * from mf_tt2 timestamp as of '2023-06-26 09:33:00' 
               where dd = '01' and hh = '01';

      返回值为:

      +------------+------------+----+----+
      | pk         | val        | dd | hh |
      +------------+------------+----+----+
      | 1          | 1          | 01 | 01 |
      | 3          | 30         | 01 | 01 |
      | 4          | 4          | 01 | 01 |
      | 5          | 5          | 01 | 01 |
      | 2          | 2          | 01 | 01 |
      +------------+------------+----+----+
    • 查询截止到指定version常量的所有历史数据

      select * from mf_tt2 version as of 2 where dd = '01' and hh = '01';

      返回值为:

      +------------+------------+----+----+
      | pk         | val        | dd | hh |
      +------------+------------+----+----+
      | 1          | 1          | 01 | 01 |
      | 3          | 3          | 01 | 01 |
      | 2          | 2          | 01 | 01 |
      +------------+------------+----+----+
    • 查询截止到当前时间的所有历史数据,即全量查询

      select * from mf_tt2 timestamp as of current_timestamp() where dd = '01' and hh = '01';

      返回值为:

      +------------+------------+----+----+
      | pk         | val        | dd | hh |
      +------------+------------+----+----+
      | 1          | 1          | 01 | 01 |
      | 3          | 30         | 01 | 01 |
      | 4          | 4          | 01 | 01 |
      | 5          | 5          | 01 | 01 |
      | 2          | 2          | 01 | 01 |
      +------------+------------+----+----+
    • 查询截止到10s前的所有历史数据

      select * from mf_tt2 timestamp as of current_timestamp() - 10 where dd = '01' and hh = '01';

      返回值为:

      +------------+------------+----+----+
      | pk         | val        | dd | hh |
      +------------+------------+----+----+
      | 1          | 1          | 01 | 01 |
      | 3          | 30         | 01 | 01 |
      | 4          | 4          | 01 | 01 |
      | 5          | 5          | 01 | 01 |
      | 2          | 2          | 01 | 01 |
      +------------+------------+----+----+
    • 查询截止到最近第二次commit的所有历史数据

      • 示例1

        select * from mf_tt2 timestamp as of get_latest_timestamp('mf_tt2', 2) where dd = '01' and hh = '01';
      • 示例2

        select * from mf_tt2 version as of get_latest_version('mf_tt2', 2) where dd = '01' and hh = '01';

        返回值为

        +------------+------------+----+----+
        | pk         | val        | dd | hh |
        +------------+------------+----+----+
        | 1          | 1          | 01 | 01 |
        | 3          | 3          | 01 | 01 |
        | 2          | 2          | 01 | 01 |
        +------------+------------+----+----+
  • Incremental查询示例

    • 查询指定时间(例如datetime格式的字符串常量)区间的历史增量数据,常量值需要根据具体操作的时间来配置

      select * from mf_tt2 timestamp between '2023-06-26 09:31:40' and '2023-06-26 09:32:00' where dd = '01' and hh = '01';

      返回值为:

      +------------+------------+----+----+
      | pk         | val        | dd | hh |
      +------------+------------+----+----+
      | 1          | 1          | 01 | 01 |
      | 3          | 3          | 01 | 01 |
      | 2          | 2          | 01 | 01 |
      +------------+------------+----+----+
    • 查询指定version区间的历史增量数据

      select * from mf_tt2 version between 2 and 3 where dd = '01' and hh = '01';

      返回值为:

      +------------+------------+----+----+
      | pk         | val        | dd | hh |
      +------------+------------+----+----+
      | 3          | 30         | 01 | 01 |
      | 4          | 4          | 01 | 01 |
      | 5          | 5          | 01 | 01 |
      +------------+------------+----+----+
    • 查询最近300s内的历史增量数据

      • 示例:表acid.incremental.query.out.of.time.range.enabled属性为默认值false

        select * from mf_tt2 timestamp between current_timestamp() - 301 and current_timestamp() where dd = '01' and hh='01';

        返回值报错:

        FAILED: ODPS-0130071:[0,0] Semantic analysis exception - physical plan generation failed: 
        com.aliyun.odps.meta.exception.MetaException: com.aliyun.odps.meta.exception.MetaException: com.aliyun.odps.metadata.common.MetastoreServerException: 
        Incremental query can't exceed current version. Current version timestamp: 2023-06-26 09:32:32, input timestamp is: 2023-06-26 10:47:55
      • 示例:修改表acid.incremental.query.out.of.time.range.enabled属性为true

        alter table mf_tt2 set tblproperties("acid.incremental.query.out.of.time.range.enabled"="true");
      • 示例:再次查询

        select * from mf_tt2 timestamp between current_timestamp() - 301 and current_timestamp() where dd = '01' and hh='01';

        返回值为:

        +------------+------------+----+----+
        | pk         | val        | dd | hh |
        +------------+------------+----+----+
        +------------+------------+----+----+
    • 查询最近两次commit的历史增量数据

      • 示例

        select * from mf_tt2 timestamp between get_latest_timestamp('mf_tt2', 3) and get_latest_timestamp('mf_tt2') where dd = '01' and hh = '01';
      • 示例

        select * from mf_tt2 version between get_latest_version('mf_tt2', 3) and get_latest_version('mf_tt2') where dd = '01' and hh = '01';
      • 返回值

        +------------+------------+----+----+
        | pk         | val        | dd | hh |
        +------------+------------+----+----+
        | 1          | 1          | 01 | 01 |
        | 3          | 30         | 01 | 01 |
        | 4          | 4          | 01 | 01 |
        | 5          | 5          | 01 | 01 |
        | 2          | 2          | 01 | 01 |
        +------------+------------+----+----+