Stream是MaxCompute自动管理Delta Table增量查询数据版本的流对象,记录对增量表所做的数据操作语言(DML)更改,包括插入、更新和删除,以及有关每次更改的元数据,以便您可以使用更改的数据采取操作。本文为您详细介绍Stream操作相关命令。
创建Stream
语法
CREATE STREAM [IF NOT EXISTS] <stream_name> 
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc") 
[comment <stream_comment>];名称  | 功能说明  | 
IF NOT EXISTS  | 可选。如果不指定  | 
stream_name  | 必填。待创建的Stream名。  | 
ON TABLE <delta_table_name>  | 代表Stream对象关联的Delta Table源表,一个Stream对象只能关联一张源表。  | 
timestamp as of t  | 代表Stream对象创建时VersionOffset初始化数据时间戳为t ,查询范围为  | 
version as of v  | 代表Stream对象创建时VersionOffset初始化数据版本为v,查询范围为  | 
strmproperties  | 同表属性一样以key/value字符类型值对的形式出现,目前stream只支持了一个属性read_mode,可选值有两个:  | 
stream_comment  | 可选。Stream注释内容且为长度不超过1024字节的有效字符串,否则报错。  | 
系统列  | 对于  | 
示例说明
创建Delta Table源表,再创建一个Stream关联Delta Table表。
CREATE TABLE delta_table_src (
  pk bigint NOT NULL PRIMARY KEY, 
  val bigint
) tblproperties ("transactional"="true");
CREATE STREAM delta_table_stream 
ON TABLE delta_table_src version as of 1 
strmproperties('read_mode'='append') 
comment 'stream demo';查看Stream信息
语法
DESC STREAM <stream_name>;示例
创建Delta Table源表,再创建一个Stream关联Delta Table表,查询Stream对象delta_table_stream。
CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key, 
val BIGINT) TBLPROPERTIES ("transactional"="true");
CREATE STREAM delta_table_stream ON TABLE delta_table_src 
version AS OF 1 strmproperties('read_mode'='append') 
comment 'stream demo';
DESC STREAM delta_table_stream;输出结果
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-06 17:03:32                     Last Modified Time                      2024-09-06 17:03:32                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      5e19a67eb97b4477b7fbce0c7bbcebca        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}名称  | 说明  | 
Name  | 当前Stream的名称。  | 
Project  | 当前Stream所在的项目名称。  | 
Create Time  | 当前Stream的创建时间。  | 
Offset Version  | 当前Stream的初始化数据版本。  | 
Reference Table Project  | 关联的源表的项目名称。  | 
Reference Table Name  | 关联的源表名称。  | 
Reference Table Id  | 关联的源表的唯一标识ID。  | 
Reference Table Version  | 关联的源表的数据版本。  | 
Parameters  | 当前Stream对象的属性信息。  | 
特别要注意Offset Version和Reference Table Version的信息,Offset Version的值为1表示当前Stream已经消费的关联Delta Table数据的版本,Reference Table Version的值为1表示当前关联的Delta Table的最新的数据版本。由于关联的Delta Table是空表,所以两者的值都是1。创建Stream对象后如果其关联的Delta Table执行了DML操作,Reference Table Version的值会随之更新改变。读取Stream时会转换成对关联表的增量查询,读取的数据版本范围区间为左开右闭区间,为(Offset Version, Reference Table Version],从而确保Offset Version和Reference Table Version之间的增量数据被读到。如果(Offset Version, Reference Table Version]版本的增量数据被DML操作消费,消费后Offset Version会等于 Reference Table Version,即都为关联Delta Table的最新数据版本,表示没有新的增量数据。
修改Stream
修改Stream属性
ALTER STREAM <stream_name> SET strmproperties ("key"="value");tream_name:必填。待描述的Stream名。
strmproperties:Stream的属性,同表属性一样以key/value字符类型值对的形式出现,目前Stream只支持属性
read_mode,并且当前不支持修改。
修改Stream的初始化数据版本
ALTER STREAM <stream_name> ON TABLE <delta_table_name> 
<timestamp as of t  |  version as of v > ;stream_name:必填。待修改的Stream名。
ON TABLE <delta_table_name>: 代表Stream对象关联的Delta Table源表,源表为修改前的源表,目前还不支持修改源表。
timestamp as of t: 代表修改Stream对象VersionOffset初始化数据版本为t,查询范围
(t, 最新增量数据版本]。version as of v: 代表修改Stream对象VersionOffset初始化数据版本为v,查询范围
(v, 最新增量数据版本]。
示例
-- 1. 创建Delta Table源表。
CREATE TABLE delta_table_src (pk bigint not null primary key, 
val bigint) tblproperties ("transactional"="true");
-- 2. 创建一个Stream关联Delta Table表。
CREATE STREAM delta_table_stream on table delta_table_src 
version as of 1 strmproperties('read_mode'='append') 
comment 'stream demo';
-- 3. 查看新建的stream信息,当前Offset Version和Reference Table Version都为1。
DESC STREAM delta_table_stream;
-- 输出结果。
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:26:56                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}
-- 4. stream关联的Delet Table插入一条数据,目的是为了使得Delta Table的lsn递增,
-- 之后我们将引用Delta Table的version修改为递增后的version.
INSERT INTO delta_table_src VALUES ('1', '1');
-- 5. 查看当前Delta Table的数据版本信息
SHOW history FOR TABLE delta_table_src;
ObjectType	ObjectId                        	ObjectName          	VERSION(LSN)    	Time               	Operation       
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000001	2024-09-07 10:25:59	CREATE          
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000002	2024-09-07 10:28:19	APPEND      
-- 6. 修改stream关联的Delta Table的version为2
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;
-- 7. 查看修改后的stream信息,stream以及关联的Delta Table的version都变成了2。
DESC STREAM delta_table_stream;
-- 输出结果。
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:29:12                     
Offset Version                          2                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 2                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}列出项目下的所有Stream
语法
SHOW STREAMS;示例
-- 列出当前项目下的所有stream对象
SHOW STREAMS;
-- 输出结果。
delta_table_stream删除Stream
语法
DROP STREAM [IF EXISTS] <stream_name>;示例
-- 1. 查看当前项目中存在的所有stream对象
SHOW STREAMS;
-- 输出显示。
delta_table_stream
-- 2. 删除delta_table_stream stream对象
DROP STREAM IF EXISTS delta_table_stream;
-- 3. 再次查看当前项目中存在的所有stream对象;结果为空
SHOW STREAMS;查询Stream
语法
SELECT * FROM <stream_name>;查询Stream时,单纯执行DQL,并不会改变Stream的状态,即Stream的增量起始的Offset Version不会改变,但其关联的Delta Table的Reference Table Version会随着Delta Table状态的改变而改变,保持为关联Delta Table最新的Version数据版本。单纯执行DQL,表示这份增量数据没有被真正消费,只是进行了数据的探查。
查询Stream并且执行了DML操作,表示真正消费了Stream所表示的增量数据,会修改Stream的状态,将关联的数据版本迁移到这次DML操作查询的最新增量数据版本,即Stream的Offset Version会等于关联Delta Table的Reference Table Version,此时表示没有新的增量数据,在当前状态下如果Stream再被读取,读取的数据为空。
CDC模式查询输出示例
Delta Table CDC模式的使用,详情请参见CDC(邀测)。
创建Delta Table源表。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ( "transactional"="true", 'acid.cdc.mode.enable'='true', 'cdc.insert.into.passthrough.enable'='true' );创建目标表。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");创建CDC模式的Stream。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties('read_mode'='cdc') comment 'stream cdc mode';插入两条数据至Delta Table源表。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);查询
delta_table_stream输出CDC格式数据。但执行单纯的DQL,并不会改变delta_table_stream状态,如下语句执行多次,返回结果一样。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 2 | 2 | 2024-09-07 11:03:53 | 1 | 0 | | 1 | 1 | 2024-09-07 11:03:53 | 1 | 0 | +------------+------------+------------------+----------------+------------------+读取
delta_table_stream表的增量数据,并插入到目标表delta_table_dest。同时将delta_table_stream的Offset Version修改为关联表delta_table_src最新的数据版本。此操作真正消费了delta_table_stream表的增量数据。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;查询目标表。表中保存的是步骤6操作消费的Stream表的增量数据。
SELECT * FROM delta_table_dest; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+再次查询
delta_table_stream表,输出为空。由于delta_table_stream表示的增量数据已经被消费,没有新的增量数据。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+执行Update操作将源表pk为1记录的val设为10。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;由于源表由Update操作产生的新的增量数据,查询
delta_table_stream,可输出Update操作的CDC数据。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 1 | 1 | 2024-09-07 11:10:21 | 0 | 1 | | 1 | 10 | 2024-09-07 11:10:21 | 1 | 1 | +------------+------------+------------------+----------------+------------------+
由上述示例可见,CDC输出模式会跟踪Delta Table源表的记录变化,输出所有变化状态的记录,可有效用于增量计算的逻辑。
Append模式查询输出示例
创建Delta Table源表。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");创建目标表。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");创建Append模式的Stream,并关联Delta Table。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties ('read_mode'='append') comment 'stream append mode';插入两条数据到Delta Table源表。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);查询
delta_table_stream。SELECT * FROM delta_table_stream; -- 输出结果,不包含系统字段。 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+读取
delta_table_stream表的增量数据,并插入到目标表,同时将delta_table_stream的Offset Version修改为关联表delta_table_src最新的数据版本,此操作真正消费delta_table_stream表的增量数据。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;查询目标表。表中保存的是步骤6 操作消费的
delta_table_stream表的增量数据。SELECT * FROM delta_table_dest; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+再次查询
delta_table_stream表,输出为空。由于delta_table_stream表示的增量数据已经被消费,没有新的增量数据。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+执行Update操作将源表pk为1的记录val设为10。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;执行删除操作,将源表中pk为2的记录删除。
DELETE FROM delta_table_src WHERE pk = 2;查询
delta_table_stream,只输出了Update操作的结果记录 (1, 10),Delete操作的记录不可见,也不会输出。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 10 | +------------+------------+
由上述示例可见,Append输出模式并不会显示数据的操作状态,只会输出一条记录的最终状态,Delete记录也不会输出。因此使用的场景有限,通常可用于一些典型的ETL场景,不断对增量插入的数据进行清洗。