变更Sink
本文为您介绍Sink变更的可兼容性和不可兼容性详情。
可兼容的变更
删除多路Sink的某路Sink,该变更属于完全兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- 删除MySink1对应的Query,该变更属于完全兼容变更。 -- 该Query中的group aggregate对应的状态会被丢弃。 INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- 删除MySink2对应的Query,该变更属于完全兼容变更。 INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
新增Sink且不带有状态的Query,该变更属于完全兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 新增无状态的Query,该变更属于完全兼容变更。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
默认情况下,Sink被认为是无状态的算子(大部分的Sink Connector没有状态)。因此变更Sink表名、Connector类型、WITH属性都是兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 修改表名、Connector属性等,该变更属于完全兼容变更。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'kafka', ... ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
不兼容的变更
新增Sink且带有状态的Query,该变更属于不兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 新增有状态的Query,该变更属于不兼容变更。 CREATE TABLE MySink2 ( b bigint, a int, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;
如果需要将Sink作为有状态算子时,例如设置table.optimizer.state-compatibility.ignore-sink=false,修改Sink表名、Connector类型都是不兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 修改表名,设置table.optimizer.state-compatibility.ignore-sink=false,该变更属于不兼容变更。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 修改Connector属性,设置table.optimizer.state-compatibility.ignore-sink=false,该变更属于不兼容变更。 create table MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'kafka', ... ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;