您可以通过手动的调整实时计算的作业参数、资源参数和上下游参数,提升实时计算作业的性能。

说明 建议您完成作业反压检测后,再判断是否需要进行配套调优。实时计算3.0以上版本作业反压检测方法,请参见作业反压检测步骤

手动配置调优概述

手动配置调优的内容主要有3种类型:

  • 上下游参数调优:主要对作业中的上下游存储参数进行调优。
  • 作业参数调优:主要对作业中的miniBatch等参数进行调优。
  • 资源参数调优:主要对作业中的Operator的并发数(Parallelism)、CPU(Core)、堆内存(Heap Memory)等参数进行调优。

下面通过3个章节对以上3类调优进行介绍。参数调优后将生成新的配置,Job需停止 > 启动或者暂停 > 恢复后才能使用新的配置,启动新配置的方法请参见重新启用新的配置

上下游参数调优

由于实时计算的特性,每条数据均会触发上下游存储的读写,会对上下游存储形成性能压力。通过设置batchsize批量的读写上下游存储数据可以降低对上下游存储的压力。支持batchsize参数的上下游存储如下:

名称 参数 详情 设置参数值
DataHub源表 batchReadSize 单次读取的条数 可选,默认为10。
DataHub源表 batchSize 单次写入的条数 可选,默认为300。
日志服务 LOG源表 batchGetSize 单次读取logGroup的条数 可选,默认为10。
分析型数据库MySQL版 2.0结果表 batchSize 一次批量写入的条数 可选,默认为1000。
云数据库RDS版结果表 batchSize 一次批量写入的条数 可选,默认为50。
云数据库 HybridDB for MySQL结果表 batchSize 一次批量写入的条数 可选,默认值为1000,建议可设置的最大值为4096。
bufferSize 去重的buffer大小,需要指定主键才生效。 可选,设置batchSize必须设置bufferSize,建议可设置的最大值为4096。
说明 DDL的WITH参数列中增加batchSize相关参数,即可完成数据的批量读写设置。例如,batchReadSize='<number>'

作业参数调优

miniBatch设置仅适用于优化GROUP BY。Flink SQL流模式下,每来一条数据都会执行State操作,IO消耗较大。设置miniBatch后,同一个Key的一批数据只访问一次State,且只输出最新的一条数据,即减少了State访问也减少了向下游的数据更新。miniBatch设置说明如下:
说明
  • 新增加的作业参数建议用户停止 > 启动
  • 更新作业参数值暂停 > 恢复即可。
#3.2及以上版本开启window miniBatch方法(3.2及以上版本默认不开启window miniBatch)。
sql.exec.mini-batch.window.enabled=true
# excatly-once语义。
blink.checkpoint.mode=EXACTLY_ONCE
# checkpoint间隔时间,单位毫秒。
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# 实时计算2.0及以上版本使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒。
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# 实时计算2.0及以上版本开启5秒的microbatch(窗口函数不要设置这个参数。)
blink.microBatch.allowLatencyMs=5000
# 表示整个Job允许的延迟。
blink.miniBatch.allowLatencyMs=5000
#双流join节点优化参数
blink.miniBatch.join.enabled=true
# 单个Batch的size。
blink.miniBatch.size=20000
# local优化,实时计算2.0及以上版本默认已经开启,1.6.4需手动开启。
blink.localAgg.enabled=true
# 实时计算2.0及以上版本开启partial优化,解决count distinct效率低问题。
blink.partialAgg.enabled=true
# union all优化。
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# GC优化(源表为SLS时,不能设置改参数)。
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# 时区设置。
blink.job.timeZone=Asia/Shanghai

资源调优

下文通过示例为您介绍资源调优方法。

  1. 分析问题
    1. 通过Job的拓扑图查看到2号的Task节点的输入队列已达到100%,造成上游1号节点的数据堆积,输出队列造成数据堆积。

    2. 单击目标Task节点(本示例为2号Task节点)。
    3. SubTask List,查看In Queue100%的列。
    4. 单击目标列操作下的查看日志
    5. 单击跳转到TaskExecutor
    6. TaskExecutor > Metrics Graph,查看CPU和MEM的使用情况。
  2. 性能调优
    1. 单击作业编辑页面右侧的资源配置,进入调优窗口。
    2. 对目标Operator或者Group进行参数修改。
      • 单个Operator参数修改:
        1. GROUP框内,单击右上角的加号(+)。
        2. 将鼠标悬停至目标Operator框内。
        3. 单击目标Operator右侧的铅笔图标。
        4. 修改Operator数据 修改参数。
      • GROUP批量参数修改:
        1. 将鼠标悬停至GROUP框内。
        2. 单击GROUP右侧的铅笔图标。
        3. 批量修改Operator数据 修改参数。
    3. 完成配置参数修改后,单击资源配置右上角的配置信息操作 > 应用当前配置
    说明
    • 如果增加了Group的资源配置后,作业的运行效率提升不明显,需要按照以下顺序分析问题:
      1. 该节点是否存在数据倾斜现象。
      2. 复杂运算的Operator节点(如:GROUP BY、WINDOW、JOIN等)的子节点是否存在异常。
    • Operator子节点拆解方法:
      1. 单击需要修改的Operator。
      2. chainingStrategy参数值修改为HEAD。若已为HEAD,需要将后一个Operator的chainingStrategy参数值修改为HEADchainingStrategy参数说明如下。
        参数 说明
        ALWAYS 算子会尽可能的链接在一起。为了优化性能,通常最佳实践是让算子尽可能链接在一起,同时增加并发度。
        NEVER 算子将不会和上下游的算子链接在一起。
        HEAD 算子将不会和上游的算子链接在一起,但是会和下游的算子链接在一起。
  3. 资源参数的配置原则和建议
    • 作业的资源配置建议为:core:heap_memory=1:4,即1个核对应4G内存。
      说明
      • Operator的总core=parallelism*core
      • Operator的总heap_memory=parallelism*heap_memory
      • Group中的core取各个Operator中最大值,memory取各个Operator中memory之和。
      例如,core参数值为1CU,heap memory参数值为3G,则最终资源分配结果的是1CU+4G;core参数值为1CU,heap memory参数值为5G,则最终资源分配结果的是1.25CU+5G。
    • parallelism
      • Source节点
        说明 Source的并发数不能大于Source的Shard数。
        • Source数量和上游Partition数量相关。
        • 例如,Source的个数是16,Source的并发数可以配置为16、8或4等,不得超过16。
      • 中间节点
        • 根据预估的QPS计算。
        • QPS低的任务,中间处理节点数和Source并发数一样。
        • QPS高的任务,中间处理节点数配置为比Source并发数大,例如,64、128或者256。
      • Sink节点
        • 并发度和下游存储的Partition数相关,一般是下游Partition个数的2~3倍。
        • 如果配置过大会导致输入超时或失败。例如,下游Sink节点个数是16,建议Sink的并发数最大配置为48。
    • core

      默认值为0.1,根据实际CPU使用配置,建议参数值为0.25。

    • heap_memory

      堆内存,默认为值为256(单位为MB),请根据实际的内存状况使用进行配置。

    • state_size

      存在GROUP BY的Task节点中可配置参数state_sizestate_size默认值为0,存在GROUP BY的Task节点需要把state_size参数值设置为1,表示该Operator会使用state功能,作业会为该Operator申请额外的内存。如果state_size参数值不设置为1,作业可能运行失败。state_size需要配成1的Operator包括GROUP BY 、undefinedJOIN、OVER和WINDOW。

重新启用新的配置

完成配置后,需要暂停 > 恢复或者停止 > 启动后才能使新配置生效。完成作业重启或恢复后,可通过运维 > 运行信息 > Vertex拓扑查看新的配置是否生效。

说明 不建议采用停止 > 启动的方式触发新配置。停止作业后,作业状态会被清除,从而可能导致计算结果不一致。
  • 暂停 > 恢复:仅更改资源配置、调整WITH参数大小或调整作业参数大小。
  • 停止 > 启动:需要更改SQL逻辑、更改作业版本、增加WITH参数或增加作业参数。
  • 暂停 > 启动步骤如下:
    1. 上线作业。上线步骤参见上线配置方式选择使用上次资源配置(手动资源配置)
    2. 运维页面,单击目标作业操作列下的暂停
    3. 运维页面,单击目标作业操作列下的恢复
    4. 恢复作业运行,单击按最新配置恢复
      resume
  • 停止 > 启动步骤如下:
    1. 停止作业。停止作业步骤参见停止
    2. 启动作业。启动作业步骤参见启动

相关参数说明

  • Global

    isChainingEnabled :是否启用Chain策略,默认为true。不需要修改

  • Nodes
    参数 说明 是否需要修改
    id 节点ID号,自动生成,ID号唯一
    uid 节点UID号,用于计算Operator ID,如果不设置,会使用ID。
    pact 节点类型,例如,Data Source、Operator或Data Sink等。
    name 节点名称,可自定义。 可修改
    slotSharingGroup 请保持默认配置。
    chainingStrategy ChainingStrategy 是用来定义算子链接的策略。当一个算子和上游算子链接在一起,这意味着它们会运行在同一个线程。它们会合并为一个有多个运行步骤的算子。ChainingStrategy支持以下三种方式:
    • ALWAYS:算子会尽可能的链接在一起。为了优化性能,通常最佳实践是让算子尽可能链接在一起,同时增加并发度。
    • NEVER:算子将不会和上下游的算子链接在一起。
    • HEAD:算子将不会和上游的算子链接在一起,但是会和下游的算子链接在一起。
    可修改
    parallelism 并发数,默认值为1,可以根据实际数据量增大并发数。 可修改
    core CPU,默认为0.1,可根据实际CPU使用情况进行配置。建议设置为0.25。 可修改
    heap_memory 堆内存,默认为256(单位为MB),可根据实际内存使情况进行配置。 可修改
    direct_memory JVM堆外内存,默认0(单位为MB),建议不修改。 可修改
    native_memory JVM堆外内存,JNI使用,默认为0,建议配置为10(单位为MB)。 可修改
  • Chain
    Flink SQL任务是一个DAG图,由多个节点(Operator)组成,部分上下游的节点在运行时可以合成为一个节点,称为Chain。Chain后的节点,总CPU为所有节点CPU的最大值,总内存为所有节点内存的总和。多节点合成一个节点可以有效的减少网络传输,降低成本。
    说明
    • 并发数相同的节点才能进行Chain操作。
    • Group By节点不能进行Chain操作。