文档

SQL作业大状态导致反压的调优原理与方法

更新时间:

状态管理不仅影响应用的性能,还关系到系统的稳定性和资源的有效利用。如果状态管理不当,可能会导致性能下降、资源耗尽,甚至系统崩溃。本文为您介绍SQL作业大状态导致反压的调优原理与方法。

运行原理:状态算子的产生

作为一种特定领域语言,SQL的设计初衷是隐藏底层数据处理的复杂性,可以通过声明式语言来进行数据操作。而Flink SQL由于其架构的特殊性,在实现层面通常需要引入状态后端配合系统检查点(Checkpoint)来保证计算结果的最终一致性。目前Flink SQL由优化器根据配置项以及SQL语句来推导生成状态算子,想要高效处理有状态的大规模数据和性能调优,需要对SQL状态算子生成机制和管理策略有一定了解。

基于优化器推导产生的状态算子

主要有如下三种状态算子:

状态算子

状态清理机制

ChangelogNormalize

生命周期TTL

SinkUpsertMaterlizer

LookupJoin(*)

ChangelogNormalize

ChangelogNormalize旨在对涉及主键语义的数据变更日志进行标准化处理。通过该算子,可以有效地整合和优化数据变更记录,确保数据的一致性和准确性。该状态算子会在以下两种场景出现:

  • 使用了带有主键的upsert源表

    upsert源表特指在保持主键顺序一致性的前提下,仅产生基于主键的UPDATE(包括INSERT和 UPDATE_AFTER)及DELETE操作的变更数据表。例如,Upsert Kafka便是支持这类操作的典型连接器之一。此外,您也可以通过重写自定义源表连接器中的getChangelogMode方法,实现upsert功能。

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.upsert();
    }
  • 显式设置'table.exec.source.cdc-events-duplicate' = 'true'

    在使用at-least-once语义进行CDC事件处理时,可能会产生重复的变更日志。在需要exactly-once语义时,您需要开启此配置项来对变更日志进行去重。例如

    image

    当出现该算子时,上游数据将按照Flink SQL源表DDL中定义的主键做一次hash shuffle操作后使用ValueState来存储当前主键下最新的整行记录。更新状态并向下游发送变更的过程如下图所示。处理第二条-U(2, 'Jerry', 77)时State已经empty,说明截止目前+I/+UA和-D/-UB已经两两抵销,当前这条retract消息是重复的,可以丢弃。

    image

SinkUpsertMaterializer

专门用于处理具有主键定义的结果表,并确保数据的物化操作符合upsert语义。在数据流更新过程中,如果无法保证upsert的特定要求,即按照主键进行更新时保持数据的唯一性和有序性,优化器会自动引入此算子。它通过维护基于结果表主键的状态信息,来确保这些约束得到满足。更多信息及常见场景请参见Flink SQL中Changelog事件乱序处理原理

LookupJoin

在处理LookupJoin操作时,若主动配置了系统优化选项'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE',且优化器识别到潜在的非确定性更新问题(如何消除流查询的不确定性影响),则系统会尝试采取特殊措施以解决这一问题。具体而言,若通过引入一个状态算子能够消除非确定性,优化器便会自动创建一个带状态的LookupJoin算子。

带状态的LookupJoin算子主要适用于以下情况:结果表被定义了主键,而这些主键完全或部分来自于维表,同时维表中的数据可能会发生变化(例如通过变更数据捕获,即CDC Lookup Source机制)。此外,用于Join操作的字段在维表中并非主键。在这种情况下,带状态的LookupJoin算子能够有效地处理数据的动态变化,确保查询结果的准确性和一致性。

基于SQL操作产生的状态算子

基于SQL操作产生的状态算子,按状态清理机制可以分为TTL过期和依赖watermark推进两类。具体说来,Flink SQL里有部分状态算子的生命周期不是由TTL来控制,例如Window相关的状态计算(WindowAggregate、WindowDeduplicate、WindowJoin、WindowTopN等)。它们的状态清理主要依赖于watermark的推进,当watermark超过窗口结束时间时,内置的定时器就会触发状态清理。

状态算子

如何产生

状态清理机制

Deduplicate

使用row_number语句,order by的字段必须为时间属性(time attribute)字段(事件时间event time或处理时间processing time),且只取第一条。

TTL

RegularJoin

使用join语句,等值条件里不包含时间属性字段。

GroupAggregate

使用group by语句进行分组聚合,如sum、count、min、max、first_value、last_value,或使用distinct关键字。

GlobalGroupAggregate

分组聚合开启local-global优化。

IncrementalGroupAggregate

当存在两层分组聚合操作并开启两阶段优化时,内层聚合对应的状态算子GlobalGroupAggregate和外层聚合对应的状态算子LocalGroupAggregate被合并成一个IncrementalGroupAggregate。

Rank

使用row_number语句,order by的字段必须为非时间属性字段。

GlobalRank

使用row_number语句,order by的字段必须为非时间属性字段,并开启local-global优化。

IntervalJoin

使用join语句,等值条件里包含时间属性字段(事件时间或处理时间)。例如:

L.time between R.time + X and R.time + Y 
  -- 或 
R.time between L.time - Y and L.time - X

watermark

TemporalJoin

使用基于事件时间的inner或left join语句。

WindowDeduplicate

基于Window TVF的去重操作。

WindowAggregate

基于Window TVF聚合。

GlobalWindowAggregate

基于Window TVF聚合,并开启两阶段优化。

WindowJoin

基于Window TVF的Join。

WindowRank

基于Window TVF的排序。

GroupWindowAggregate

基于legacy语法的Window聚合。

问题诊断方法

在Flink作业遭遇性能瓶颈时,系统往往表现出明显的反压现象。这种反压可能由多种因素引起,但主要的原因之一是作业状态规模的持续膨胀,直至超出内存限制。此时,状态存储引擎会将部分不频繁使用的状态数据移至磁盘,而磁盘与内存在数据存取速度上的巨大差异,使得磁盘IO操作成为数据处理效率的瓶颈。尤其在Flink的计算过程中,如果算子频繁地从磁盘读取状态数据,将显著增加作业的延迟,降低整体处理速度,成为性能问题的根源。

为了准确识别是否由状态访问引发反压,需要对作业的运行状态和算子行为进行深入分析。利用监控工具追踪和诊断性能瓶颈,可以有效地发现并解决由状态访问引起的性能问题,从而提升Flink作业的性能,具体方法请参见问题诊断方法

调优方法

主动避免生成不必要的状态算子

基于SQL操作产生的状态算子一般很难避免,因此主要针对优化器自动推导的算子进行讨论。

  • ChangelogNormalize

    在使用upsert source进行数据处理时,需注意其ChangelogNormalize状态节点的生成。通常情况下,除了事件时间的时态关联(event time temporal join)外,其他upsert source应用场景都会产生该状态节点。因此,在选择Upsert Kafka或类似的Upsert连接器时,应首先评估具体的使用场景,对于非事件时间关联场景,应特别关注状态算子的状态指标(state metrics)。由于状态节点是基于KeyedState的,当源表的主键数量庞大时,状态节点的规模也会相应增加。如果物理表的主键更新频繁,状态节点也将频繁地被访问和修改。从实践角度而言,像数据同步类的场景,建议避免使用Upsert Kafka作为源表连接器,同时也最好选择能够保证exactly-once语义的数据同步工具。

  • SinkUpsertMaterializer

    auto作为table.exec.sink.upsert-materialize配置项的默认值,表明系统会自动判断数据的一致性,尤其是在变更日志(changelog)出现无序的情况下。该机制确保了通过引入SinkUpsertMaterializer来维持数据处理的准确性。但并不意味着每当该算子被激活,数据就一定存在无序问题。例如,将多个分组键(group by key)合并的操作,这种情况下优化器无法准确推导出upsert键,因此出于安全考虑,会默认添加SinkUpsertMaterializer。如果对数据的分布有充分的了解,不使用该算子也能够确保输出结果的正确性,可以将参数设置为none,从而在数据正确性和性能上都得到保证。

    您可以通过检查作业的最后一个节点来确认SinkUpsertMaterializer是否被激活使用。在作业的运行拓扑图中(如下所示),该算子通常会与sink算子一起显示,形成一个算子链。通过这种方式,可以直观地监控和评估SinkUpsertMaterializer在数据处理过程中的实际应用情况,从而做出更加合理的优化决策。

    image.png

    image.png

    在检测到生成了特定算子且数据计算无误的情况下,可以调整配置项为 'table.exec.sink.upsert-materialize'='none'(配置步骤请参见如何配置作业运行参数?),以避免自动添加SinkUpsertMaterializer。实时计算引擎VVR 8.0及以上版本中引入了SQL执行计划智能分析功能,协助您更好地识别此类问题,如下图所示。

    image.png

减少状态访问频次:开启mini-batch

在对延时要求不高(比如分钟级别更新)的场景下,开启mini-batch攒批优化将会减少State的访问和更新频率(具体操作请参见开启MiniBatch),提升吞吐。

实时计算Flink版可以应用mini-batch的状态算子如下:

状态算子

说明

ChangelogNormalize

无。

Deduplicate

可配置table.exec.deduplicate.mini-batch.compact-changes-enable,在基于事件时间去重时是否压缩Changelog。

GroupAggregate

GlobalGroupAggregate

IncrementalGroupAggregate

无。

RegularJoin

需额外配置table.exec.stream.join.mini-batch-enabled开启mini-batch join优化。适用于更新流和outer join场景。

减少状态大小设置合理生命周期

说明

开启或关闭TTL不能保证完全兼容。当尝试在已开启TTL的作业上关闭TTL配置时,或者反过来操作时,将会导致兼容性失败并引发StateMigrationException异常。

在优化计算系统时,关键在于精简状态数据以提高性能。您可以在作业运维页面配置State数据过期时间(参数详情请参见运行参数配置)来控制作业状态的生命周期,以满足不同的运维需求和策略。

image.png

过短的TTL可能导致数据未能及时处理,从而产生不符合预期的计算结果,例如,在聚合或连接操作时,部分数据晚到,而相关状态已过期,导致结果异常。相反,过长的TTL会消耗资源,降低作业的稳定性。因此,在对Flink SQL作业进行TTL配置时,建议根据数据特性和业务需求进行恰当的TTL设置。例如,如果计算周期以自然天为单位,并且数据跨天漂移不会超过1小时,那么将TTL设定为25小时即可满足需求。数据开发人员应深入了解业务场景和计算逻辑,以实现最佳的平衡。

此外,针对双流连接场景,Flink SQL自实时计算引擎VVR 8.0.1版本起,支持通过JOIN_STATE_TTL Hint为左流和右流分别设置不同的生命周期。这一改进允许为各自数据流定制生命周期,有效减少不必要的状态存储开销,从而优化作业性能。您可以根据左右流数据的实际生命周期需求,灵活配置,以达到节省资源和提高作业效率的目的,具体操作请参见查询提示

SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...

下面是一个作业使用JOIN_STATE_TTL Hint前后的State大小对比示例。

对比

作业情况

状态大小

优化前

  • 双流join操作,左流数据量大,约为右流的20至50倍。右流需长期保存数据,原定为18天。为提升性能,实际将右流的保存周期缩短至10天,导致数据正确性受损。

  • join操作的状态大小约为5.8 TB。

  • 单作业所需资源高达700 CU。

22

优化后

  • 通过合理设置JOIN_STATE_TTL Hint,左流可缩短至12小时,右流保持18天的保存周期,无需牺牲数据完整性。

  • join操作的状态大幅减少至约590 GB,仅约为原来的十分之一。

  • 资源消耗显著降低,从700 CU降至200-300 CU,节省了50%-70%的资源。

23e

减少状态大小:命中更优的执行计划

在生成执行计划时,优化器会结合输入SQL和配置选择相应的State实现。

  • 利用主键优化双流连接

    • 当连接键(Join Key)包含主键时,系统采用ValueState<RowData>进行数据存储,这样可以为每个连接键仅保留一条最新记录,实现存储空间的最大化节省。

    • 如果连接操作使用了非主键字段,即使已定义主键,系统会使用MapState<RowData, RowData>进行存储,以便为每个连接键保存来自源表的、基于主键的最新记录。

    • 在未定义主键的情况下,系统将使用MapState<RowData, Integer>存储数据,记录每个连接键对应的整行数据及其出现次数。

    因此,建议在建表DDL中声明主键,并在双流连接时优先使用主键,以优化存储效率。

  • 优化append_only流去重操作

    使用ROW_NUMBER函数替代FIRST_VALUE或LAST_VALUE函数进行去重,可以更有效地保留首次(ROW_NUMBER函数生成的Deduplicate算子仅保留出现过的Key)或最新出现的记录(保留Key及其最后一次出现的记录)。

  • 提升聚合查询性能

    在进行多维度统计,例如计算全网UV、手机客户端UV、PC端UV等,推荐使用AGG WITH FILTER语法替代传统的CASE WHEN语法。SQL优化器能够识别Filter参数,使得在同一个字段上根据不同条件计算COUNT DISTINCT时能够共享状态信息,减少状态的读写次数。根据性能测试结果,采用AGG WITH FILTER语法相比CASE WHEN可以提升高达一倍的性能。

减少状态大小:调整多流Join顺序,缓解State放大

Flink在处理数据流时,采用了二进制哈希连接(Binary Hash Join)的方式。在下图示例中,A与B的连接结果会导致数据存储的冗余,这种冗余程度与连接操作的频率成正比。随着加入连接的流数量增加,State的冗余问题会变得更加严重。

image.png

您可以策略性地调整连接的顺序来优化该问题。具体来说,可以先将数据量较小的流进行连接,而将数据量大的流放在最后进行。这样的顺序调整有助于减轻状态冗余带来的放大效应,从而提高数据处理的效率和性能。

尽可能减少读盘

为了提升系统性能,可以通过减少磁盘读取次数并优化内存使用来实现,具体请参见尽可能减少读盘

相关文档