文档

重要参数说明

更新时间:

本文介绍SQL开发中涉及的重要参数说明和使用示例。

table.exec.sink.keyed-shuffle

为解决向带有主键的表中写入数据时出现的分布式乱序问题,您可以通过table.exec.sink.keyed-shuffle参数来进行Hash Shuffle操作,这将确保相同主键的数据被发送到算子的同一个并发,减少分布式乱序问题。

注意事项

  • 仅在上游算子能够确保更新记录在主键字段上的顺序性时,Hash Shuffle操作才起作用;否则,Hash Shuffle操作不能解决问题。

  • 在作业专家模式时,修改算子并发度,不适用下面的并发度判定规则。

取值说明

  • AUTO(默认值):表示在Sink的并发度不为1,且Sink的并发度与上游算子不同时,当数据流向Sink时,Flink会自动对主键字段进行Hash Shuffle操作。

  • FORCE:表示在Sink并发度不为1时,当数据流向Sink时,Flink会强制对主键字段进行Hash Shuffle操作。

  • NONE:表示Flink不会根据Sink和上游算子的并发度信息进行Hash Shuffle操作。

使用示例

  • 参数值为AUTO

    1. 新建SQL流作业,复制如下测试SQL(显式指定Sink并发度为2),部署作业。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print',
         --您可以通过sink.parallelism参数直接指定Sink并发度。
        'sink.parallelism'='2'
      );
      
      INSERT INTO sink SELECT * FROM s1;
      --您也可以通过动态表选项的方式指定Sink并发度。
      --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;
    2. 作业运维页面的部署详情页签资源配置区域,将并发度设置为1,在运行参数配置区域其他配置中,不设置table.exec.sink.keyed-shuffle参数或显式添加table.exec.sink.keyed-shuffle: AUTO(两者效果一致)。

      image

    3. 启动作业。在状态总览页签下,您可以看到Sink节点和上游的数据连接方式为HASH。

      image

  • 参数值为FORCE

    1. 新建SQL流作业,复制如下测试SQL(不再显式指定Sink并发度),部署作业。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print'
      );
      
      INSERT INTO sink
      SELECT * FROM s1;
    2. 作业运维页面的部署详情页签资源配置区域,将并发度设置为2。在运行参数配置区域其他配置中添加table.exec.sink.keyed-shuffle: FORCE

      image

    3. 启动作业后,在状态总览页签下,您可以看到Sink节点和上游节点的并发度都为2,并且数据连接方式变成了HASH。

      image

table.exec.mini-batch.size

该参数控制了相关的计算节点进行微批操作所缓存的最大数据条数,达到该值后触发最终的计算和数据输出。该参数只有与table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency同时使用时才会生效。有关MiniBatch相关的优化请参见MiniBatch AggregationMiniBatch 双流Join

注意事项

在作业启动前,如果未在运行参数配置区域显式设置该参数,在MiniBatch处理模式下,将使用Managed Memory缓存数据,在以下几种条件下都会触发最终计算和数据输出:

  • 收到MiniBatchAssigner节点发送的watermark消息

  • Managed Memory已满

  • 进行Checkpoint前

  • 作业停止时

取值说明

  • -1(默认值):表示使用Managed Memory缓存数据。

  • 其他Long类型的负值:同默认设置。

  • 其他Long类型的正值:表示使用Heap Memory来缓存数据。当缓存的数据量达到N条时,会自动触发输出操作。

使用示例

  1. 新建SQL流作业,复制如下测试SQL,部署作业。

    CREATE TEMPORARY TABLE s1 (
      a INT,
      b INT,
      ts TIMESTAMP(3),
      PRIMARY KEY (a) NOT ENFORCED,
      WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
    ) WITH (
      'connector'='datagen',
      'rows-per-second'='1',
      'fields.ts.kind'='random',
      'fields.ts.max-past'='5s',
      'fields.b.kind'='random',
      'fields.b.min'='0',
      'fields.b.max'='10'
    );
    
    CREATE TEMPORARY TABLE sink (
      a INT,
      b BIGINT,
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      'connector'='print'
    );
    
    INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;
  2. 作业运维页面的部署详情运行参数配置区域其他配置中,设置table.exec.mini-batch.enabled: truetable.exec.mini-batch.allow-latency: 2s参数,不设置table.exec.mini-batch.size(取默认值-1)。

  3. 启动作业。在状态总览页签下,您可以看到作业包含了MiniBatchAssigner节点、LocalGroupAggregate节点和GlobalGroupAggregate节点。

    image

相关文档

为什么数据在LocalGroupAggregate节点中长时间卡住,无输出?