您可以手动调整实时计算Flink版的作业参数、资源参数和上下游参数,提升实时计算Flink版作业的性能。

说明 建议您完成作业反压检测后,再判断是否需要进行配套调优。实时计算Flink版3.0以上版本作业反压检测方法,详情请参见如何对实时计算3.0以上版本的作业进行反压检测?

手动配置调优概述

手动配置调优的内容主要有3种类型:
  • 上下游参数调优:对作业中上下游存储的参数进行调优。
  • 作业参数调优:对作业中miniBatch等参数进行调优。
  • 资源参数调优:对作业中Operator的并发数(Parallelism)、CPU(Core)和堆内存(Heap Memory)等参数进行调优。

下面对以上3类调优进行介绍。参数调优后将生成新的配置,作业需要先停止启动或者暂停 > 恢复后才能使用新的配置,启动新配置的方法请参见重新启用新的配置

上下游参数调优

实时计算Flink版每条数据均会触发上下游存储的读写,因此会对上下游存储形成存储压力。设置batchsize,批量读写上下游存储数据可以降低上下游存储的压力。支持batchsize参数的上下游存储如下表所示。
上下游 参数 说明 设置参数值
DataHub源表 batchReadSize 单次读取的条数 可选,默认为10。
DataHub结果表 batchSize 单次写入的条数 可选,默认为300。
日志服务LOG源表 batchGetSize 单次读取logGroup的条数 可选,默认为10。
分析型数据库MySQL版2.0结果表 batchSize 一次批量写入的条数 可选,默认为1000。
云数据库RDS版结果表 batchSize 一次批量写入的条数 可选,默认为50。
云数据库HybridDB for MySQL结果表 batchSize 一次批量写入的条数 可选,默认值为1000,建议batchSize最大值为4096。
bufferSize 去重的buffer大小,需要指定主键才能生效。 可选,设置batchSize必须设置bufferSize,建议bufferSize的最大值为4096。
说明 DDL的WITH参数列中增加batchSize相关参数,即可完成数据的批量读写设置。例如,batchReadSize='<number>'

作业参数调优

miniBatch设置仅适用于优化GROUP BY。Flink SQL流模式下,每来一条数据都会执行State操作,I/O消耗较大。设置miniBatch后,同一个Key的一批数据只访问一次State,且只输出最新的一条数据,既减少了State的访问,也减少了下游的数据更新。miniBatch设置说明如下:
  • 新增加作业参数后,建议您停止作业,再启动作业。
  • 更新作业参数值后,建议您暂停作业,再恢复作业。
# 3.2及以上版本开启window miniBatch方法(3.2及以上版本默认不开启window miniBatch)。
sql.exec.mini-batch.window.enabled=true
# excatly-once语义。
blink.checkpoint.mode=EXACTLY_ONCE
# checkpoint间隔时间,单位毫秒。
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# 实时计算Flink版2.0及以上版本使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒。
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# 实时计算Flink版2.0及以上版本开启5秒的microbatch(窗口函数不需要设置该参数)。
blink.microBatch.allowLatencyMs=5000
# 表示整个Job允许的延迟。
blink.miniBatch.allowLatencyMs=5000
# 双流join节点优化参数。
blink.miniBatch.join.enabled=true
# 单个Batch的size。
blink.miniBatch.size=20000
# local优化,实时计算Flink版2.0及以上版本默认已经开启,1.6.4版本需要手动开启。
blink.localAgg.enabled=true
# 实时计算Flink版2.0及以上版本开启partial优化,解决count distinct效率低问题。
blink.partialAgg.enabled=true
# union all优化。
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# GC优化(源表为SLS时,不能设置该参数)。
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# 时区设置。
blink.job.timeZone=Asia/Shanghai

资源调优

下文通过示例为您介绍资源调优方法:
  1. 分析过程
    1. 通过Job的拓扑图查看到2号的Task节点的输入队列已达到100%,造成上游1号节点的数据堆积,输出队列造成数据堆积。
    2. 单击目标Task节点(本示例为2号Task节点)。
    3. SubTask List,查看In Queue100%的列。
    4. 单击目标列操作下的查看日志
    5. 单击跳转到TaskExecutor
    6. TaskExecutor > Metrics Graph,查看CPU和MEM的使用情况。
  2. 性能调优
    1. 单击作业编辑页面右侧的资源配置,进入调优窗口。
    2. 对目标Operator或者Group进行参数修改。
      • 单个Operator参数修改
        1. GROUP框内,单击右上角的加号(+)。
        2. 鼠标悬停至目标Operator框内。
        3. 单击目标Operator右侧的铅笔图标图标。
        4. 修改Operator数据页面,修改参数。
      • GROUP批量参数修改
        1. 鼠标悬停至GROUP框内。
        2. 单击GROUP右侧的铅笔图标图标。
        3. 批量修改Operator数据页面,修改参数。
    3. 完成配置参数修改后,单击资源配置右上角的配置信息操作 > 应用当前配置
      • 如果增加了Group的资源配置后,作业的运行效率提升不明显,需要按照以下顺序分析问题:
        1. 该节点是否存在数据倾斜现象。
        2. 复杂运算的Operator节点(例如,GROUP BY、WINDOW和JOIN等)的子节点是否存在异常。
      • Operator子节点拆解方法:
        1. 单击目标Operator。
        2. 修改chainingStrategy参数值为HEAD
          如果chainingStrategy已设置为HEAD,需要修改后一个Operator的chainingStrategy参数值为HEADchainingStrategy参数说明如下。
          参数 说明
          ALWAYS 算子会尽可能的链接在一起。为了优化性能,通常需要让算子尽可能链接在一起,同时增加并发度。
          NEVER 算子不会和上下游的算子链接在一起。
          HEAD 算子不会和上游的算子链接在一起,但是会和下游的算子链接在一起。
  3. 资源参数的配置原则和建议
    • 作业的资源配置建议为:core:heap_memory=1:4,即1个核对应4G内存。例如:
      • core参数值为1CU,heap memory参数值为3G,则最终资源分配结果的是1CU+4G
      • core参数值为1CU,heap memory参数值为5G,则最终资源分配结果的是1.25CU+5G
      说明
      • Operator的总core=parallelism*core
      • Operator的总heap_memory=parallelism*heap_memory
      • Group中的core取各个Operator中最大值,memory取各个Operator中memory之和。
    • parallelism
      • Source节点
        Source的个数和上游Partition数量有关。例如,Source的个数是16,Source的并发数可以为16、8或4等,不得超过16。
        说明 Source的并发数不能大于Source的Shard数。
      • 中间节点
        根据预估的QPS计算:
        • QPS低的任务,中间处理节点数和Source并发数一样。
        • QPS高的任务,中间处理节点数配置为比Source并发数大,例如,64、128或256。
      • Sink节点
        并发度和下游存储的Partition数有关,一般是下游Partition个数的2~3倍。
        说明 如果配置过大会导致输入超时或失败。例如,下游Sink节点个数是16,建议Sink的并发数最大为48。
    • core

      默认值为0.1,根据实际CPU使用配置,建议配置为0.25。

    • heap_memory

      堆内存,默认为值为256(单位为MB),请根据实际的内存使用状况进行配置。

    • state_size
      存在GROUP BY、undefinedJOIN、OVER和WINDOW的Task节点中需要配置参数state_size1,表示该Operator会使用state功能,作业会为该Operator申请额外的内存。state_size默认值为0。
      说明 如果state_size参数值不设置为1,则作业可能运行失败。

重新启用新的配置

完成配置后,建议您采用暂停恢复作业的方式使新配置生效,而不是停止启动作业的方式使新配置生效。因为停止作业后,作业状态会被清除,从而可能导致计算结果不一致。
说明
  • 暂停恢复:仅更改资源配置、调整WITH参数大小或调整作业参数大小。
  • 停止启动:需要更改SQL逻辑、更改作业版本、增加WITH参数或增加作业参数。

完成作业重启或恢复后,可以通过运维 > 运行信息 > Vertex拓扑查看新的配置是否生效。

  • 暂停恢复步骤如下:
    1. 上线作业。上线步骤参见上线,详情请参见上线配置方式选择使用上次资源配置(手动资源配置)
    2. 运维页面,单击目标作业操作列下的暂停
    3. 运维页面,单击目标作业操作列下的恢复
    4. 恢复作业运行,单击按最新配置恢复resume
  • 停止启动步骤如下:
    1. 停止作业。停止作业步骤请参见停止
    2. 启动作业。启动作业步骤参见启动

相关参数说明

  • Global

    isChainingEnabled :是否启用Chain策略,默认为true。不需要修改。

  • Nodes
    参数 说明 能否修改
    id 节点ID号,自动生成,ID号唯一。 不能
    uid 节点UID号,用于计算Operator ID,如果不设置,则使用ID。 不能
    pact 节点类型,例如,Data Source、Operator或Data Sink等。 不能
    name 节点名称,可自定义。 可以
    slotSharingGroup 请保持默认配置。 不能
    chainingStrategy ChainingStrategy用来定义算子链接的策略。当一个算子和上游算子链接在一起,表示它们会运行在同一个线程,合并为一个有多个运行步骤的算子。ChainingStrategy支持以下3种方式:
    • ALWAYS:算子会尽可能的链接在一起。为了优化性能,通常最佳实践是让算子尽可能链接在一起,同时增加并发度。
    • NEVER:算子不会和上下游的算子链接在一起。
    • HEAD:算子不会和上游的算子链接在一起,但是会和下游的算子链接在一起。
    可以
    parallelism 并发数,默认值为1,可以根据实际数据量增大并发数。 可以
    core CPU,默认为0.1,可以根据实际CPU使用情况进行配置。建议设置为0.25。 可以
    heap_memory 堆内存,默认为256(单位为MB),可以根据实际内存使情况进行配置。 可以
    direct_memory JVM(Java虚拟机)堆外内存,默认0(单位为MB)。 可以,但不建议您修改该参数。
    native_memory JVM(Java虚拟机)堆外内存,JNI(Java Native Interface)中使用,默认为0,建议配置为10(单位为MB)。 可以,但不建议您修改该参数。
  • Chain
    Flink SQL任务是一个DAG图,由多个节点(Operator)组成,部分上下游的节点在运行时可以合成为一个节点,称为Chain。Chain后的节点,总CPU为所有节点CPU的最大值,总内存为所有节点内存的总和。多节点合成一个节点可以有效的减少网络传输,降低成本。
    说明
    • 并发数相同的节点才能进行Chain操作。
    • Group By节点不能进行Chain操作。