流对象(Stream)

Stream是MaxCompute自动管理Delta Table增量查询数据版本的流对象,记录对增量表所做的数据操作语言(DML)更改,包括插入、更新和删除,以及有关每次更改的元数据,以便您可以使用更改的数据采取操作。本文为您详细介绍Stream操作相关命令。

创建Stream

语法

CREATE STREAM [IF NOT EXISTS] <stream_name> 
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc") 
[comment <stream_comment>];

名称

功能说明

IF NOT EXISTS

可选。如果不指定IF NOT EXISTS选项而存在同名Stream,会报错。如果指定IF NOT EXISTS,只要存在同名Stream,即使原Stream结构与要创建的目标Stream结构不一致,均返回成功。已存在的同名Stream的元数据信息不会被改动。

stream_name

必填。待创建的Stream名。

ON TABLE <delta_table_name>

代表Stream对象关联的Delta Table源表,一个Stream对象只能关联一张源表。

timestamp as of t

代表Stream对象创建时VersionOffset初始化数据时间戳为t ,查询范围为(t, 最新增量数据时间戳]

version as of v

代表Stream对象创建时VersionOffset初始化数据版本为v,查询范围为(v, 最新增量数据版本]

strmproperties

同表属性一样以key/value字符类型值对的形式出现,目前stream只支持了一个属性read_mode,可选值有两个:append模式用来消费Delta Table的表数据,CDC模式用来消费Delta Table的cdc数据。

stream_comment

可选。Stream注释内容且为长度不超过1024字节的有效字符串,否则报错。

系统列

对于"read_mode" = "cdc",会额外输出三个系统列: __meta_timestamp代表数据写入时间,__meta_op_type (包含INSERT | DELETE)和__meta_is_update (包含TRUE | FALSE),可组合成四种情况: INSERT + FALSE代表新记录,INSERT + TRUE代表Update后的值,DELETE + TRUE代表Update前的值,DELETE+FALSE代表删除。但当前输出的系统列__meta_op_type__meta_is_update的值为tinyint类型,下面是对应的组合值和对应的含义:

+--------------------+--------------+-----------------+
|     表示的操作       |__meta_op_type|__meta_is_update |
+--------------------+--------------+-----------------+
|       INSERT       |    INSERT(1) |      FALSE(0)   |
|       DELETE       |    DELETE(0) |      FALSE(0)   |
|   UDPATE_BEFORE    |    DELETE(0) |      TRUE(1)    |
|   UPDATE_AFTER     |    INSERT(1) |      TRUE(1)    |
+--------------------+--------------+-----------------+

示例说明

创建Delta Table源表,再创建一个Stream关联Delta Table表。

CREATE TABLE delta_table_src (
  pk bigint NOT NULL PRIMARY KEY, 
  val bigint
) tblproperties ("transactional"="true");

CREATE STREAM delta_table_stream 
ON TABLE delta_table_src version as of 1 
strmproperties('read_mode'='append') 
comment 'stream demo';

查看Stream信息

语法

DESC STREAM <stream_name>;

示例

创建Delta Table源表,再创建一个Stream关联Delta Table表,查询Stream对象delta_table_stream

CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key, 
val BIGINT) TBLPROPERTIES ("transactional"="true");

CREATE STREAM delta_table_stream ON TABLE delta_table_src 
version AS OF 1 strmproperties('read_mode'='append') 
comment 'stream demo';

DESC STREAM delta_table_stream;

输出结果

Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-06 17:03:32                     Last Modified Time                      2024-09-06 17:03:32                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      5e19a67eb97b4477b7fbce0c7bbcebca        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

名称

说明

Name

当前Stream的名称。

Project

当前Stream所在的项目名称。

Create Time

当前Stream的创建时间。

Offset Version

当前Stream的初始化数据版本。

Reference Table Project

关联的源表的项目名称。

Reference Table Name

关联的源表名称。

Reference Table Id

关联的源表的唯一标识ID。

Reference Table Version

关联的源表的数据版本。

Parameters

当前Stream对象的属性信息。

说明

特别要注意Offset VersionReference Table Version的信息,Offset Version的值为1表示当前Stream已经消费的关联Delta Table数据的版本,Reference Table Version的值为1表示当前关联的Delta Table的最新的数据版本。由于关联的Delta Table是空表,所以两者的值都是1。创建Stream对象后如果其关联的Delta Table执行了DML操作,Reference Table Version的值会随之更新改变。读取Stream时会转换成对关联表的增量查询,读取的数据版本范围区间为左开右闭区间,为(Offset Version, Reference Table Version],从而确保Offset Version和Reference Table Version之间的增量数据被读到。如果(Offset Version, Reference Table Version]版本的增量数据被DML操作消费,消费后Offset Version会等于 Reference Table Version,即都为关联Delta Table的最新数据版本,表示没有新的增量数据。

修改Stream

修改Stream属性

ALTER STREAM <stream_name> SET stmproperties ("key"="value");
说明
  • tream_name:必填。待描述的Stream名。

  • strmproperties:Stream的属性,同表属性一样以key/value字符类型值对的形式出现,目前Stream只支持属性read_mode,并且当前不支持修改。

修改Stream的初始化数据版本

ALTER STREAM <stream_name> ON TABLE <delta_table_name> 
<timestamp as of t  |  version as of v > ;
说明
  • stream_name:必填。待修改的Stream名。

  • ON TABLE <delta_table_name>: 代表Stream对象关联的Delta Table源表,源表为修改前的源表,目前还不支持修改源表。

  • timestamp as of t: 代表修改Stream对象VersionOffset初始化数据版本为t,查询范围(t, 最新增量数据版本]

  • version as of v: 代表修改Stream对象VersionOffset初始化数据版本为v,查询范围(v, 最新增量数据版本]

示例

-- 1. 创建Delta Table源表。
CREATE TABLE delta_table_src (pk bigint not null primary key, 
val bigint) tblproperties ("transactional"="true");

-- 2. 创建一个Stream关联Delta Table表。
CREATE STREAM delta_table_stream on table delta_table_src 
version as of 1 strmproperties('read_mode'='append') 
comment 'stream demo';

-- 3. 查看新建的stream信息,当前Offset Version和Reference Table Version都为1。
DESC STREAM delta_table_stream;
-- 输出结果。
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:26:56                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

-- 4. stream关联的Delet Table插入一条数据,目的是为了使得Delta Table的lsn递增,
-- 之后我们将引用Delta Table的version修改为递增后的version.
INSERT INTO delta_table_src VALUES ('1', '1');

-- 5. 查看当前Delta Table的数据版本信息
SHOW history FOR TABLE delta_table_src;

ObjectType	ObjectId                        	ObjectName          	VERSION(LSN)    	Time               	Operation       
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000001	2024-09-07 10:25:59	CREATE          
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000002	2024-09-07 10:28:19	APPEND      

-- 6. 修改stream关联的Delta Table的version为2
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;

-- 7. 查看修改后的stream信息,stream以及关联的Delta Table的version都变成了2。
DESC STREAM delta_table_stream;

-- 输出结果。
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:29:12                     
Offset Version                          2                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 2                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

列出项目下的所有Stream

语法

SHOW STREAMS;

示例

-- 列出当前项目下的所有stream对象
SHOW STREAMS;
-- 输出结果。
delta_table_stream

删除Stream

语法

DROP STREAM [IF EXISTS] <stream_name>;

示例

-- 1. 查看当前项目中存在的所有stream对象
SHOW STREAMS;
-- 输出显示。
delta_table_stream

-- 2. 删除delta_table_stream stream对象
DROP STREAM IF EXISTS delta_table_stream;

-- 3. 再次查看当前项目中存在的所有stream对象;结果为空
SHOW STREAMS;

查询Stream

语法

SELECT * FROM <stream_name>;
说明
  • 查询Stream时,单纯执行DQL,并不会改变Stream的状态,即Stream的增量起始的Offset Version不会改变,但其关联的Delta Table的Reference Table Version会随着Delta Table状态的改变而改变,保持为关联Delta Table最新的Version数据版本。单纯执行DQL,表示这份增量数据没有被真正消费,只是进行了数据的探查。

  • 查询Stream并且执行了DML操作,表示真正消费了Stream所表示的增量数据,会修改Stream的状态,将关联的数据版本迁移到这次DML操作查询的最新增量数据版本,即Stream的Offset Version会等于关联Delta Table的Reference Table Version,此时表示没有新的增量数据,在当前状态下如果Stream再被读取,读取的数据为空。

CDC模式查询输出示例

Delta Table CDC模式的使用,详情请参见CDC

  1. 创建Delta Table源表。

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties (
      "transactional"="true", 
      'acid.cdc.mode.enable'='true', 
      'cdc.insert.into.passthrough.enable'='true'
    );
  2. 创建目标表。

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  3. 创建CDC模式的Stream。

    CREATE STREAM delta_table_stream 
    ON TABLE delta_table_src version as of 1 
    strmproperties('read_mode'='cdc') 
    comment 'stream cdc mode';
  4. 插入两条数据至Delta Table源表。

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. 查询delta_table_stream输出CDC格式数据。但执行单纯的DQL,并不会改变delta_table_stream状态,如下语句执行多次,返回结果一样。

    SELECT * FROM  delta_table_stream;
    
    -- 输出结果
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 2          | 2          | 2024-09-07 11:03:53 | 1             | 0                |
    | 1          | 1          | 2024-09-07 11:03:53 | 1              | 0                |
    +------------+------------+------------------+----------------+------------------+
  6. 读取delta_table_stream表的增量数据,并插入到目标表delta_table_dest。同时将delta_table_stream的Offset Version修改为关联表delta_table_src最新的数据版本。此操作真正消费了delta_table_stream表的增量数据。

    INSERT INTO  delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. 查询目标表。表中保存的是步骤6操作消费的Stream表的增量数据。

    SELECT * FROM delta_table_dest; 
    
    -- 输出结果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. 再次查询delta_table_stream表,输出为空。由于delta_table_stream表示的增量数据已经被消费,没有新的增量数据。

    SELECT * FROM delta_table_stream;
    
    -- 输出结果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. 执行Update操作将源表pk为1记录的val设为10。

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. 由于源表由Update操作产生的新的增量数据,查询delta_table_stream,可输出Update操作的CDC数据。

    SELECT * FROM delta_table_stream;  
    
    --  输出结果
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 1          | 1          | 2024-09-07 11:10:21 | 0              | 1                |
    | 1          | 10         | 2024-09-07 11:10:21 | 1              | 1                |
    +------------+------------+------------------+----------------+------------------+

由上述示例可见,CDC输出模式会跟踪Delta Table源表的记录变化,输出所有变化状态的记录,可有效用于增量计算的逻辑。

Append模式查询输出示例

  1. 创建Delta Table源表。

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  2. 创建目标表。

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  3. 创建Append模式的Stream,并关联Delta Table。

    CREATE STREAM delta_table_stream 
    ON TABLE delta_table_src version as of 1 
    strmproperties ('read_mode'='append') 
    comment 'stream append mode';
  4. 插入两条数据到Delta Table源表。

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. 查询delta_table_stream

    SELECT * FROM delta_table_stream; 
    
    -- 输出结果,不包含系统字段。
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  6. 读取delta_table_stream表的增量数据,并插入到目标表,同时将delta_table_stream的Offset Version修改为关联表delta_table_src最新的数据版本,此操作真正消费delta_table_stream表的增量数据。

    INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. 查询目标表。表中保存的是步骤6 操作消费的delta_table_stream表的增量数据。

    SELECT * FROM delta_table_dest; 
    
    -- 输出结果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. 再次查询delta_table_stream表,输出为空。由于delta_table_stream表示的增量数据已经被消费,没有新的增量数据。

    SELECT * FROM delta_table_stream; 
    
    -- 输出结果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. 执行Update操作将源表pk为1的记录val设为10。

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. 执行删除操作,将源表中pk为2的记录删除。

    DELETE FROM delta_table_src WHERE pk = 2;
  11. 查询delta_table_stream,只输出了Update操作的结果记录 (1, 10),Delete操作的记录不可见,也不会输出。

    SELECT * FROM delta_table_stream; 
    
    -- 输出结果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 10         |
    +------------+------------+

由上述示例可见,Append输出模式并不会显示数据的操作状态,只会输出一条记录的最终状态,Delete记录也不会输出。因此使用的场景有限,通常可用于一些典型的ETL场景,不断对增量插入的数据进行清洗。