变更Sink

本文为您介绍Sink变更的可兼容性和不可兼容性详情。

可兼容的变更

  • 删除多路Sink的某路Sink,该变更属于完全兼容变更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
    
    -- 删除MySink1对应的Query,该变更属于完全兼容变更。
    -- 该Query中的group aggregate对应的状态会被丢弃。
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
    
    -- 删除MySink2对应的Query,该变更属于完全兼容变更。
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
  • 新增Sink且不带有状态的Query,该变更属于完全兼容变更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 新增无状态的Query,该变更属于完全兼容变更。
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
  • 默认情况下,Sink被认为是无状态算子(大部分的Sink连接器没有状态)。因此变更Sink表名、连接器类型、WITH属性都是兼容变更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 修改表名、连接器类型等,该变更属于完全兼容变更。
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'kafka',
      ...
    );
    INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

不兼容的变更

  • 新增Sink且带有状态的Query,该变更属于不兼容变更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 新增有状态的Query,该变更属于不兼容变更。
    CREATE TABLE MySink2 (
      b bigint,
      a int,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;
  • 设置table.optimizer.state-compatibility.ignore-sink=false,将Sink视为有状态算子,修改表名、连接器类型,此变更属于不兼容变更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 设置table.optimizer.state-compatibility.ignore-sink=false,
    -- 将Sink视为有状态算子,修改表名,该变更属于不兼容变更
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 设置table.optimizer.state-compatibility.ignore-sink=false,
    -- 将Sink视为有状态算子,修改连接器类型(例如将print改为blackhole),该变更属于不兼容变更
    create table MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'blackhole',
      ...
    );
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
  • 删除了一路sink的同时,将Sink或Source的TEMPORARY TABLE DDL语句也进行了修改或删除,属于不兼容变更。详情请参见其他限制

  • 一般情况下Sink是无状态的算子(默认table.optimizer.state-compatibility.ignore-sink为true来忽略Sink)。但在有数据乱序风险时,框架将基于Sink生成有状态的SinkMaterializer节点来消除乱序、保障数据正确性,该节点为有状态节点,详情请参见Flink SQL中Changelog事件乱序处理原理。因此变更后可能出现作业启动时兼容性检测显示完全兼容,但实际可能不兼容的情况。例如更换Source的Primary Key导致Sink上游Upsert Key发生变化。

    --原始query
    CREATE TEMPORARY TABLE MyTable (
      a int primary key not enforced,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    CREATE TEMPORARY TABLE MySink (a int, b bigint, c bigint  primary key not enforced) with ('connector'='print');
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
    
    --修改主键所属的变量
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint primary key not enforced,
      d bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    CREATE TEMPORARY TABLE MySink (a int, b bigint primary key not enforced, c bigint) with ('connector'='print');
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
  • 参数table.optimizer.state-compatibility.ignore-sink由true变更为false(将Sink视为有状态算子,纳入状态兼容检测范围),属于不兼容变更。