您可以手动调整实时计算Flink版的作业参数、资源参数和上下游参数,提升实时计算Flink版作业的性能。
说明 建议您完成作业反压检测后,再判断是否需要进行配套调优。实时计算Flink版3.0以上版本作业反压检测方法,详情请参见如何对实时计算Flink版3.0以上版本的作业进行反压检测?。
手动配置调优概述
手动配置调优的内容主要有3种类型:
- 上下游参数调优:对作业中上下游存储的参数进行调优。
- 作业参数调优:对作业中miniBatch等参数进行调优。
- 资源参数调优:对作业中Operator的并发数(Parallelism)、CPU(Core)和堆内存(Heap Memory)等参数进行调优。
下面对以上3类调优进行介绍。参数调优后将生成新的配置,作业需要先停止再启动或者 后才能使用新的配置,启动新配置的方法请参见重新启用新的配置。
上下游参数调优
实时计算Flink版每条数据均会触发上下游存储的读写,因此会对上下游存储形成存储压力。设置batchsize,批量读写上下游存储数据可以降低上下游存储的压力。支持batchsize参数的上下游存储如下表所示。
上下游 | 参数 | 说明 | 设置参数值 |
---|---|---|---|
DataHub源表 | batchReadSize | 单次读取的条数 | 可选,默认值为10。 |
DataHub结果表 | batchSize | 单次写入的条数 | 可选,默认值为300。 |
日志服务LOG源表 | batchGetSize | 单次读取logGroup的条数 | 可选,默认值为10。 |
分析型数据库MySQL版2.0结果表 | batchSize | 一次批量写入的条数 | 可选,默认值为1000。 |
云数据库RDS版结果表 | batchSize | 一次批量写入的条数 | 可选,默认值为4096。 |
云数据库HybridDB for MySQL结果表 | batchSize | 一次批量写入的条数 | 可选,默认值为1000,建议batchSize最大值为4096。 |
bufferSize | 去重的buffer大小,需要指定主键才能生效。 | 可选,设置batchSize必须设置bufferSize,建议bufferSize的最大值为4096。 |
说明 DDL的WITH参数列中增加batchSize相关参数,即可完成数据的批量读写设置。例如,
batchReadSize='<number>'
。
作业参数调优
miniBatch设置仅适用于优化GROUP BY。Flink SQL流模式下,每来一条数据都会执行State操作,I/O消耗较大。设置miniBatch后,同一个Key的一批数据只访问一次State,且只输出最新的一条数据,既减少了State的访问,也减少了下游的数据更新。miniBatch设置说明如下:
- 新增加作业参数后,建议您停止作业,再启动作业。
- 更新作业参数值后,建议您暂停作业,再恢复作业。
# 3.2及以上版本开启window miniBatch方法(3.2及以上版本默认不开启window miniBatch)。
sql.exec.mini-batch.window.enabled=true
# excatly-once语义。
blink.checkpoint.mode=EXACTLY_ONCE
# checkpoint间隔时间,单位毫秒。
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# 实时计算Flink版2.0及以上版本使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒。
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# 实时计算Flink版2.0及以上版本开启5秒的microbatch(窗口函数不需要设置该参数)。
blink.microBatch.allowLatencyMs=5000
# 表示整个Job允许的延迟。
blink.miniBatch.allowLatencyMs=5000
# 双流join节点优化参数。
blink.miniBatch.join.enabled=true
# 单个Batch的size。
blink.miniBatch.size=20000
# local优化,实时计算Flink版2.0及以上版本默认已经开启,1.6.4版本需要手动开启。
blink.localAgg.enabled=true
# 实时计算Flink版2.0及以上版本开启partial优化,解决count distinct效率低问题。
blink.partialAgg.enabled=true
# union all优化。
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# GC优化(源表为SLS时,不能设置该参数)。
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# 时区设置。
blink.job.timeZone=Asia/Shanghai
资源调优
下文通过示例为您介绍资源调优方法:
- 分析过程
- 通过Job的拓扑图查看到2号的Task节点的输入队列已达到100%,造成上游1号节点的数据堆积,输出队列造成数据堆积。
- 单击目标Task节点(本示例为2号Task节点)。
- 在SubTask List,查看In Queue为
100%
的列。 - 单击目标列操作下的查看日志。
- 单击跳转到TaskExecutor。
- 在 ,查看CPU和MEM的使用情况。
- 通过Job的拓扑图查看到2号的Task节点的输入队列已达到100%,造成上游1号节点的数据堆积,输出队列造成数据堆积。
- 性能调优
- 单击作业编辑页面右侧的资源配置,进入调优窗口。
- 对目标Operator或者Group进行参数修改。
- 单个Operator参数修改
- GROUP框内,单击右上角的加号(+)。
- 鼠标悬停至目标Operator框内。
- 单击目标Operator右侧的
图标。
- 在修改Operator数据页面,修改参数。
- GROUP批量参数修改
- 鼠标悬停至GROUP框内。
- 单击GROUP右侧的
图标。
- 在批量修改Operator数据页面,修改参数。
- 单个Operator参数修改
- 完成配置参数修改后,单击资源配置右上角的 。
- 如果增加了Group的资源配置后,作业的运行效率提升不明显,需要按照以下顺序分析问题:
- 该节点是否存在数据倾斜现象。
- 复杂运算的Operator节点(例如,GROUP BY、WINDOW和JOIN等)的子节点是否存在异常。
- Operator子节点拆解方法:
- 单击目标Operator。
- 修改chainingStrategy参数值为
HEAD
。如果chainingStrategy已设置为HEAD
,需要修改后一个Operator的chainingStrategy参数值为HEAD
。chainingStrategy参数说明如下。参数 说明 ALWAYS 算子会尽可能的链接在一起。为了优化性能,通常需要让算子尽可能链接在一起,同时增加并发度。 NEVER 算子不会和上下游的算子链接在一起。 HEAD 算子不会和上游的算子链接在一起,但是会和下游的算子链接在一起。
- 如果增加了Group的资源配置后,作业的运行效率提升不明显,需要按照以下顺序分析问题:
- 资源参数的配置原则和建议
- 作业的资源配置建议为:
core:heap_memory=1:4
,即1个核对应4G内存。例如:- core参数值为1CU,heap memory参数值为3G,则最终资源分配结果的是1CU+4G。
- core参数值为1CU,heap memory参数值为5G,则最终资源分配结果的是1.25CU+5G。
说明- Operator的总core=parallelism*core。
- Operator的总heap_memory=parallelism*heap_memory。
- Group中的core取各个Operator中最大值,memory取各个Operator中memory之和。
- parallelism
- Source节点
Source的个数和上游Partition数量有关。例如,Source的个数是16,Source的并发数可以为16、8或4等,不得超过16。说明 Source的并发数不能大于Source的Shard数。
- 中间节点
根据预估的QPS计算:
- QPS低的任务,中间处理节点数和Source并发数一样。
- QPS高的任务,中间处理节点数配置为比Source并发数大,例如,64、128或256。
- Sink节点
并发度和下游存储的Partition数有关,一般是下游Partition个数的2~3倍。说明 如果配置过大会导致输入超时或失败。例如,下游Sink节点个数是16,建议Sink的并发数最大为48。
- Source节点
- core
默认值为0.1,根据实际CPU使用配置,建议配置为0.25。
- heap_memory
堆内存,默认为值为256(单位为MB),请根据实际的内存使用状况进行配置。
- state_size存在GROUP BY、undefinedJOIN、OVER和WINDOW的Task节点中需要配置参数state_size为
1
,表示该Operator会使用state功能,作业会为该Operator申请额外的内存。state_size默认值为0。说明 如果state_size参数值不设置为1
,则作业可能运行失败。
- 作业的资源配置建议为:
重新启用新的配置
完成配置后,建议您采用暂停再恢复作业的方式使新配置生效,而不是停止再启动作业的方式使新配置生效。因为停止作业后,作业状态会被清除,从而可能导致计算结果不一致。
说明
- 暂停再恢复:仅更改资源配置、调整WITH参数大小或调整作业参数大小。
- 停止再启动:需要更改SQL逻辑、更改作业版本、增加WITH参数或增加作业参数。
完成作业重启或恢复后,可以通过
查看新的配置是否生效。相关参数说明
- Global
isChainingEnabled:是否启用Chain策略,默认为true。不需要修改。
- Nodes
参数 说明 能否修改 id 节点ID号,自动生成,ID号唯一。 不能 uid 节点UID号,用于计算Operator ID,如果不设置,则使用ID。 不能 pact 节点类型,例如,Data Source、Operator或Data Sink等。 不能 name 节点名称,可自定义。 能 slotSharingGroup 请保持默认配置。 不能 chainingStrategy ChainingStrategy用来定义算子链接的策略。当一个算子和上游算子链接在一起,表示它们会运行在同一个线程,合并为一个有多个运行步骤的算子。ChainingStrategy支持以下3种方式: - ALWAYS:算子会尽可能的链接在一起。为了优化性能,通常最佳实践是让算子尽可能链接在一起,同时增加并发度。
- NEVER:算子不会和上下游的算子链接在一起。
- HEAD:算子不会和上游的算子链接在一起,但是会和下游的算子链接在一起。
能 parallelism 并发数,默认值为1,可以根据实际数据量增大并发数。 能 core CPU,默认为0.1,可以根据实际CPU使用情况进行配置。建议设置为0.25。 能 heap_memory 堆内存,默认为256(单位为MB),可以根据实际内存使情况进行配置。 能 direct_memory JVM(Java虚拟机)堆外内存,默认0(单位为MB)。 能,但不建议您修改该参数。 native_memory JVM(Java虚拟机)堆外内存,JNI(Java Native Interface)中使用,默认为0,建议配置为10(单位为MB)。 能,但不建议您修改该参数。 - Chain
Flink SQL任务是一个DAG图,由多个节点(Operator)组成,部分上下游的节点在运行时可以合成为一个节点,称为Chain操作。Chain操作后的节点总CPU为所有节点CPU的最大值,总内存为所有节点内存的总和。多节点合成一个节点可以有效的减少网络传输,降低成本。说明
- 并发数相同的节点才能进行Chain操作。
- Group By节点不能进行Chain操作。
在文档使用中是否遇到以下问题
更多建议
匿名提交