Stream是MaxCompute自动管理Delta Table增量查询数据版本的流对象,记录对增量表所做的数据操作语言(DML)更改,包括插入、更新和删除,以及有关每次更改的元数据,以便您可以使用更改的数据采取操作。本文为您详细介绍Stream操作相关命令。
创建Stream
语法
CREATE STREAM [IF NOT EXISTS] <stream_name>
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc")
[comment <stream_comment>];
名称 | 功能说明 |
IF NOT EXISTS | 可选。如果不指定 |
stream_name | 必填。待创建的Stream名。 |
ON TABLE <delta_table_name> | 代表Stream对象关联的Delta Table源表,一个Stream对象只能关联一张源表。 |
timestamp as of t | 代表Stream对象创建时VersionOffset初始化数据时间戳为t ,查询范围为 |
version as of v | 代表Stream对象创建时VersionOffset初始化数据版本为v,查询范围为 |
strmproperties | 同表属性一样以key/value字符类型值对的形式出现,目前stream只支持了一个属性read_mode,可选值有两个: |
stream_comment | 可选。Stream注释内容且为长度不超过1024字节的有效字符串,否则报错。 |
系统列 | 对于
|
示例说明
创建Delta Table源表,再创建一个Stream关联Delta Table表。
CREATE TABLE delta_table_src (
pk bigint NOT NULL PRIMARY KEY,
val bigint
) tblproperties ("transactional"="true");
CREATE STREAM delta_table_stream
ON TABLE delta_table_src version as of 1
strmproperties('read_mode'='append')
comment 'stream demo';
查看Stream信息
语法
DESC STREAM <stream_name>;
示例
创建Delta Table源表,再创建一个Stream关联Delta Table表,查询Stream对象delta_table_stream
。
CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key,
val BIGINT) TBLPROPERTIES ("transactional"="true");
CREATE STREAM delta_table_stream ON TABLE delta_table_src
version AS OF 1 strmproperties('read_mode'='append')
comment 'stream demo';
DESC STREAM delta_table_stream;
输出结果
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-06 17:03:32 Last Modified Time 2024-09-06 17:03:32
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 5e19a67eb97b4477b7fbce0c7bbcebca
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}
名称 | 说明 |
Name | 当前Stream的名称。 |
Project | 当前Stream所在的项目名称。 |
Create Time | 当前Stream的创建时间。 |
Offset Version | 当前Stream的初始化数据版本。 |
Reference Table Project | 关联的源表的项目名称。 |
Reference Table Name | 关联的源表名称。 |
Reference Table Id | 关联的源表的唯一标识ID。 |
Reference Table Version | 关联的源表的数据版本。 |
Parameters | 当前Stream对象的属性信息。 |
特别要注意Offset Version
和Reference Table Version
的信息,Offset Version
的值为1表示当前Stream已经消费的关联Delta Table数据的版本,Reference Table Version
的值为1表示当前关联的Delta Table的最新的数据版本。由于关联的Delta Table是空表,所以两者的值都是1。创建Stream对象后如果其关联的Delta Table执行了DML操作,Reference Table Version的值会随之更新改变。读取Stream时会转换成对关联表的增量查询,读取的数据版本范围区间为左开右闭区间,为(Offset Version, Reference Table Version]
,从而确保Offset Version和Reference Table Version之间的增量数据被读到。如果(Offset Version, Reference Table Version]
版本的增量数据被DML操作消费,消费后Offset Version会等于 Reference Table Version,即都为关联Delta Table的最新数据版本,表示没有新的增量数据。
修改Stream
修改Stream属性
ALTER STREAM <stream_name> SET stmproperties ("key"="value");
tream_name:必填。待描述的Stream名。
strmproperties:Stream的属性,同表属性一样以key/value字符类型值对的形式出现,目前Stream只支持属性
read_mode
,并且当前不支持修改。
修改Stream的初始化数据版本
ALTER STREAM <stream_name> ON TABLE <delta_table_name>
<timestamp as of t | version as of v > ;
stream_name:必填。待修改的Stream名。
ON TABLE <delta_table_name>: 代表Stream对象关联的Delta Table源表,源表为修改前的源表,目前还不支持修改源表。
timestamp as of t: 代表修改Stream对象VersionOffset初始化数据版本为t,查询范围
(t, 最新增量数据版本]
。version as of v: 代表修改Stream对象VersionOffset初始化数据版本为v,查询范围
(v, 最新增量数据版本]
。
示例
-- 1. 创建Delta Table源表。
CREATE TABLE delta_table_src (pk bigint not null primary key,
val bigint) tblproperties ("transactional"="true");
-- 2. 创建一个Stream关联Delta Table表。
CREATE STREAM delta_table_stream on table delta_table_src
version as of 1 strmproperties('read_mode'='append')
comment 'stream demo';
-- 3. 查看新建的stream信息,当前Offset Version和Reference Table Version都为1。
DESC STREAM delta_table_stream;
-- 输出结果。
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:26:56
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}
-- 4. stream关联的Delet Table插入一条数据,目的是为了使得Delta Table的lsn递增,
-- 之后我们将引用Delta Table的version修改为递增后的version.
INSERT INTO delta_table_src VALUES ('1', '1');
-- 5. 查看当前Delta Table的数据版本信息
SHOW history FOR TABLE delta_table_src;
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000001 2024-09-07 10:25:59 CREATE
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000002 2024-09-07 10:28:19 APPEND
-- 6. 修改stream关联的Delta Table的version为2
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;
-- 7. 查看修改后的stream信息,stream以及关联的Delta Table的version都变成了2。
DESC STREAM delta_table_stream;
-- 输出结果。
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:29:12
Offset Version 2
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 2
Parameters {
"comment": "stream demo",
"read_mode": "append"}
列出项目下的所有Stream
语法
SHOW STREAMS;
示例
-- 列出当前项目下的所有stream对象
SHOW STREAMS;
-- 输出结果。
delta_table_stream
删除Stream
语法
DROP STREAM [IF EXISTS] <stream_name>;
示例
-- 1. 查看当前项目中存在的所有stream对象
SHOW STREAMS;
-- 输出显示。
delta_table_stream
-- 2. 删除delta_table_stream stream对象
DROP STREAM IF EXISTS delta_table_stream;
-- 3. 再次查看当前项目中存在的所有stream对象;结果为空
SHOW STREAMS;
查询Stream
语法
SELECT * FROM <stream_name>;
查询Stream时,单纯执行DQL,并不会改变Stream的状态,即Stream的增量起始的Offset Version不会改变,但其关联的Delta Table的Reference Table Version会随着Delta Table状态的改变而改变,保持为关联Delta Table最新的Version数据版本。单纯执行DQL,表示这份增量数据没有被真正消费,只是进行了数据的探查。
查询Stream并且执行了DML操作,表示真正消费了Stream所表示的增量数据,会修改Stream的状态,将关联的数据版本迁移到这次DML操作查询的最新增量数据版本,即Stream的Offset Version会等于关联Delta Table的Reference Table Version,此时表示没有新的增量数据,在当前状态下如果Stream再被读取,读取的数据为空。
CDC模式查询输出示例
Delta Table CDC模式的使用,详情请参见CDC。
创建Delta Table源表。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ( "transactional"="true", 'acid.cdc.mode.enable'='true', 'cdc.insert.into.passthrough.enable'='true' );
创建目标表。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");
创建CDC模式的Stream。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties('read_mode'='cdc') comment 'stream cdc mode';
插入两条数据至Delta Table源表。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
查询
delta_table_stream
输出CDC格式数据。但执行单纯的DQL,并不会改变delta_table_stream
状态,如下语句执行多次,返回结果一样。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 2 | 2 | 2024-09-07 11:03:53 | 1 | 0 | | 1 | 1 | 2024-09-07 11:03:53 | 1 | 0 | +------------+------------+------------------+----------------+------------------+
读取
delta_table_stream
表的增量数据,并插入到目标表delta_table_dest
。同时将delta_table_stream
的Offset Version修改为关联表delta_table_src
最新的数据版本。此操作真正消费了delta_table_stream
表的增量数据。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
查询目标表。表中保存的是步骤6操作消费的Stream表的增量数据。
SELECT * FROM delta_table_dest; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+
再次查询
delta_table_stream
表,输出为空。由于delta_table_stream
表示的增量数据已经被消费,没有新的增量数据。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+
执行Update操作将源表pk为1记录的val设为10。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;
由于源表由Update操作产生的新的增量数据,查询
delta_table_stream
,可输出Update操作的CDC数据。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 1 | 1 | 2024-09-07 11:10:21 | 0 | 1 | | 1 | 10 | 2024-09-07 11:10:21 | 1 | 1 | +------------+------------+------------------+----------------+------------------+
由上述示例可见,CDC输出模式会跟踪Delta Table源表的记录变化,输出所有变化状态的记录,可有效用于增量计算的逻辑。
Append模式查询输出示例
创建Delta Table源表。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");
创建目标表。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");
创建Append模式的Stream,并关联Delta Table。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties ('read_mode'='append') comment 'stream append mode';
插入两条数据到Delta Table源表。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
查询
delta_table_stream
。SELECT * FROM delta_table_stream; -- 输出结果,不包含系统字段。 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+
读取
delta_table_stream
表的增量数据,并插入到目标表,同时将delta_table_stream
的Offset Version修改为关联表delta_table_src
最新的数据版本,此操作真正消费delta_table_stream
表的增量数据。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
查询目标表。表中保存的是步骤6 操作消费的
delta_table_stream
表的增量数据。SELECT * FROM delta_table_dest; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+
再次查询
delta_table_stream
表,输出为空。由于delta_table_stream
表示的增量数据已经被消费,没有新的增量数据。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+
执行Update操作将源表pk为1的记录val设为10。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;
执行删除操作,将源表中pk为2的记录删除。
DELETE FROM delta_table_src WHERE pk = 2;
查询
delta_table_stream
,只输出了Update操作的结果记录 (1, 10),Delete操作的记录不可见,也不会输出。SELECT * FROM delta_table_stream; -- 输出结果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 10 | +------------+------------+
由上述示例可见,Append输出模式并不会显示数据的操作状态,只会输出一条记录的最终状态,Delete记录也不会输出。因此使用的场景有限,通常可用于一些典型的ETL场景,不断对增量插入的数据进行清洗。