文档

MaxCompute近实时增全量一体化架构介绍

更新时间:

面对当前日益复杂且对数据时效性要求极高的近实时业务场景,MaxCompute基于Transaction Table2.0推出了集大规模存储、高效批量处理和近实时能力于一体的近实时增量一体化架构。本文为您介绍该架构的工作原理及其优势。

背景和现状

在当前典型的数据处理业务场景中,对时效性要求较低的大规模数据全量批处理的单一场景,直接采用MaxCompute已经足以满足业务需求。然而,随着MaxCompute承载的业务规模和使用场景的不断丰富,除了处理好大规模离线批处理链路之外,对于近实时和增全量处理链路也面临着众多需求。如下图所示。

image

比如近实时数据导入链路,依赖平台引擎具备事务隔离、小文件自动合并等能力,又比如增全量数据合并链路,还依赖增量数据存储和读写,主键等能力。在MaxCompute推出新架构之前,为了支持这些复杂的综合业务场景,只能依赖于下图所示的三种解决方案,这三种解决方案在成本、易用性、低延时、高吞吐等方面相互制约,很难同时具备较好的效果。如下图所示。

image

在大数据开源生态领域,针对这些问题已经出现了一些典型的解决方案,其中最典型的是Spark、Flink、Trino等开源数据处理引擎,它们深度集成了Hudi、Delta Lake、Iceberg、Paimon等开源数据湖,并以开放统一的计算引擎和数据存储理念为基础,提供了解决Lambda架构带来的一系列问题的方案。

近实时增全量一体化架构

基于上述背景,MaxCompute推出近实时增全量一体化架构,支持丰富的数据源,并通过定制开发的接入工具实现增量和离线批量数据导入到统一的存储中,由后台数据管理服务自动优化编排数据存储结构,使用统一的计算引擎支持近实时增量处理链路和大规模离线批量处理链路,而且有统一的元数据服务支持事务和文件元数据管理。如下图所示。

image

当前架构已支持了部分核心能力,包括主键表,Upsert实时写入、Time travel查询、增量查询、SQL DML操作、表数据自动治理优化等。架构原理和相关操作详情,请参见架构原理基本操作

架构优势

近实时增全量一体化架构尽可能涵盖了开源数据湖(HUDI、Iceberg)的一些通用功能,以便于相关业务链路之间的迁移。此外,作为完全自研设计的新架构,在功能、性能、稳定性和集成等方面也具备许多独特亮点:

  • 统一的存储、元数据和计算引擎,实现了深度和高效的集成。该系统具备低成本的存储、高效的数据文件管理和查询效率,同时还支持Time travel增量查询。

  • 开发了一套通用且完备的SQL语法系统,支持所有核心功能。

  • 深度定制优化的数据导入工具,支持很多复杂的业务场景。

  • 无缝衔接MaxCompute现有的业务场景,减少数据迁移的繁琐与风险,同时还降低了数据存储和计算成本。

  • 实现完全自动化的数据文件管理,以确保更佳的读写稳定性和性能,自动优化存储效率并降低成本。

  • 基于MaxCompute平台完全托管,可以开箱即用,无需额外接入成本,功能生效只需创建一张Transaction Table2.0表即可。

  • 作为完全自主研发的架构,需求开发节奏完全自主可控。

业务场景

表格式和数据治理

建表

image

为了支持近实时增全量一体化架构,MaxCompute引入Transaction Table2.0并统一了表数据组织格式,可以同时支持既有的批处理链路,以及近实时增量等新链路的所有功能。建表语法示例如下。

CREATE TABLE tt2 (pk bigint notnullprimarykey, val string) tblproperties ("transactional"="true");
CREATE TABLE par_tt2 (pk bigint notnullprimarykey, val string)  partitioned BY (pt string) tblproperties ("transactional"="true");

执行建表命令CREATE TABLE创建Transaction Table2.0表,只需要设置主键Primary Key(PK)以及"transactional"="true"。其中,PK用来保障数据行的Unique属性,transactional属性用来配置ACID事务机制,满足读写快照隔离。建表详情请参见表操作

Transaction Table2.0表关键属性

Transaction Table2.0表属性详情请参见Transaction Table2.0表参数。其中关键属性介绍如下:

  • write.bucket.num:默认值为16,取值范围为(0, 4096]。表示每个分区或非分区表的分桶数量,也表示数据写入的并发节点数量。分区表支持修改,新分区默认生效,非分区表不支持修改。

    说明

    数据写入和查询的并发度可以通过增加桶数量来实现水平扩展,但是,并非桶数量并非越多越好。因为每个数据文件会归属于一个桶,桶数量的增加会导致更多的小文件生成,进一步增加存储成本和压力,同时也会影响读取效率。对于桶数量的建议如下:

    • 在非分区表的情况下,如果数据量小于1 G,建议将桶的数量设置为4~16 MB如果数据量大于1 G,建议每个桶承载的数据大小设在128 MB~256 MB之间。当数据量大于1 T时,建议每个桶的数据范围调整为500 MB~1 GB。

    • 在分区表的情况下,每个分区的桶数量设置原则可以参考上面非分区表的配置建议。

      对于存在海量分区的表,并且每个分区的数据量较小,比如在几十兆以内,建议每个分区的桶数量尽可能少,配置在1~2个即可,避免产生过多的小文件。

  • acid.data.retain.hours:默认取值为72,取值范围为[0, 168]。表示Time travel可查询数据历史状态的时间范围(单位:小时)。如果您不需要Time travel查询历史数据,建议将此属性值设置为0,代表关闭Time travel查询功能,这样可以有效节省数据历史状态的存储成本。

    建议您按照实际的业务场景设置合理的时间周期,设置的时间越长,保存的历史数据越多,产生的存储费用就越多,而且也会在一定程度上影响查询效率。

    说明
    • 超过已配置的时间后,系统开始自动回收清理,一旦清理完成,Time travel就查询不到对应的历史状态了。回收的数据主要包含操作日志和数据文件两部分。

    • 特殊情况下您可执行purge命令,手动触发强制清除历史数据。

Schema Evolution

Transaction Table2.0支持完整的Schema Evolution操作,包括增加和删除列。在Time travel查询历史数据时,会根据历史数据的Schema来读取数据。另外,PK列不支持修改。简单示例如下。详细DDL语法详情,请参见表操作

ALTER TABLE tt2 ADD columns (val2 string);

表数据组织格式

文件格式

如上图所示,展示了分区表的数据结构,先按照分区对数据文件进行物理隔离,不同分区的数据在不同的目录之下。每个分区内的数据按照桶数量来切分数据,每个桶的数据文件单独存放;。数据文件类型主要分为Delta Data File和Compacted Data File两种:

  • Delta Data File:每次事务写入或者小文件合并后生成的增量数据文件,会保存每行记录的中间历史状态,用于满足近实时增量读写需求。

  • Compacted Data File:Delta Data File经过Compact执行生成的数据文件,会消除数据记录的中间历史状态,PK值相同的记录只会保留一行,按照列式压缩存储,用来支撑高效的数据查询需求。

数据自动治理优化

  • 存在的问题

    Transactional Table 2.0支持分钟级近实时增量数据导入,高流量场景下可能会导致增量小文件数量膨胀,尤其是桶数量较大的情况,从而引发存储访问压力大、成本高,数据读写I/O效率低下等问题。如果Update和Delete格式的数据较多,也会造成数据中间状态的冗余记录较多,进一步增加存储和计算的成本,查询效率降低等问题。

  • 解决方案

    MaxCompute存储引擎配套支持了合理高效的表数据服务,对存储数据进行自动治理和优化,降低存储和计算成本,提升分析处理性能。

    image

    如上图所示,Transactional Table 2.0的表数据服务主要分成Auto SortAuto MergeAuto Partial CompactAuto Clean,您无需手动配置,存储引擎服务会智能识别并自动收集各个维度的数据信息,配置合理的策略自动执行。

    • Auto Sort:支持将实时写入的行存Avro文件转换为AliORC列式存储格式,从而显著降低存储开支成本并大幅提升数据读取性能。

    • Auto Merge:自动合并小文件,解决小文件数量膨胀引发的各种问题。

      主要策略是周期性地根据数据文件大小、文件数量、写入时序等多个维度进行综合分析,进行分层次的合并。该过程确保不会清除任何记录的中间历史状态,从而保证Time travel查询的完整性和可追溯性。

    • Auto Partial Compact:自动合并文件并消除记录的历史状态,以降低Update和Delete记录过多所带来的额外存储成本,并提升读取效率。

      主要策略是周期性地根据增量数据大小、写入时序以及Time travel查询时间等多个维度进行综合分析来执行Compact操作。

      说明

      该操作只针对超过Time travel可查询时间范围的历史记录进行Compact操作。

    • Auto Clean:将自动清理无效文件,节省存储成本。Auto SortAuto MergeAuto Partial Compact操作执行完成并生成新的数据文件后,原有的数据文件即失去实际效用,系统会实时启动自动删除流程,迅速释放存储空间,确保存储成本的即时节约。

    对于对查询性能有极高要求的场景,您可考虑手动触发全量数据的major compact操作。详情请参见COMPACTION

    SET odps.merge.task.mode=service;
    ALTER TABLE tt2 compact major;
    说明

    该操作对每个数据桶内的全部信息进行深度整合,彻底消除所有历史状态,同时生成全新的Aliorc列式存储文件,因此,此类操作不仅会产生额外的执行开销,还会增加新生成文件的存储成本,建议仅在必要时实施。

数据写入

分钟级近实时Upsert写入

MaxCompute离线架构一般以小时或天级别,批量导入增量数据至新表或新分区中。然后配置对应的离线ETL处理链路,将增量数据和存量表数据进行Join Merge操作,生成新的全量数据。此离线链路的延时较长,资源和存储也会有一定的消耗。

近实时增全量一体化架构的Upsert实时写入链路,可以保持数据从写入到查询的延时在5~10分钟,满足分钟级近实时业务需求,并且不需要复杂的ETL链路来进行增全量数据的合并操作,节省了相应的计算和存储成本。且在实际业务数据处理场景中,涉及的数据源丰富多样,可能存在数据库、日志系统或者其他消息队列等系统,为了方便您将数据写入Transactional Table2.0表, MaxCompute深度定制开发了开源的Flink Connector工具,联合DataWorks数据集成以及其他数据导入工具,针对高并发、容错、事务提交等场景做了定制化的设计及开发优化,以满足延时低、正确性高等要求。

image

如上图所示:

  • 大部分兼容Flink生态的计算引擎与工具都能够通过配置Flink作业,有效地利用MaxCompute Flink Connector实现对Transactional Table2.0表的实时数据写入。

  • 通过调整Transactional Table2.0表属性write bucket num,可以灵活配置写入并发度,从而实现写入速度的水平扩展。

    由于MaxCompute进行了高效优化,若将Transactional Table 2.0表属性write bucket num配置为Flink sink并发数的整数倍,其写入性能最佳。

  • 结合Flink内置的Checkpoint机制处理容错场景,确保数据处理过程遵循exactly_once语义。

  • 支持上千分区同时写入,满足海量分区并发写入场景需求。

  • 满足数据分钟级可见,支持读写快照隔离。

  • 不同的环境和配置都可能对流量吞吐情况产生影响,流量吞吐上限可参考单个桶1 MB/s的处理能力进行评估。MaxCompute数据传输渠道默认使用共享的公共数据传输服务资源组,可能会存在吞吐量不稳定的问题,尤其是在资源竞争激烈时,并且存在资源使用上限。为了确保更稳定地写入吞吐量,您可以选择购买与使用独享数据传输服务资源组

    说明

    独享数据传输服务资源需额外收费,详情请参见数据传输独享资源费用(包年包月)

数据库整库实时同步写入

当前数据库系统与大数据处理引擎都有各自擅长的数据处理场景,面对一些复杂的业务需求,往往需要同时运用OLTP(联机事务处理)、OLAP(联机分析处理)及离线分析引擎来对数据进行全面且深入的分析与处理,因此数据就需要在不同引擎间流转。其中,将数据库中单表或至整库的实时更新记录无缝同步至MaxCompute进行分析处理是目前比较典型的业务链路。

image

如上图所示:

  • 左边流程是MaxCompute离线架构,一般以小时或天级别批量导入增量数据至新表或新分区中,然后配置对应的离线ETL处理链路,将增量数据和存量表数据进行Join Merge操作,生成新的全量数据。此离线链路的延时较长,资源和存储也会有一定的消耗。

  • 右边流程是近实时增全量一体化架构,与MaxCompute离线架构相比摒弃了繁琐的周期性提取与合并过程,实现实时(以分钟级别精确度)读取数据库中的变更记录,而且仅依赖单一的Transactional Table2.0表即可完成数据更新,从而最大限度地削减了计算与存储成本。

    目前MaxCompute支持通过DataWorks数据集成的同步能力实现整库或单表的数据同步。详情请参见MySQL整库实时同步至MaxCompute

SQL DML与Upsert批处理链路

为了让您更便捷地操作Transactional Table2.0,并轻松实现复杂数据查询和操作需求,MaxCompute已对SQL引擎的核心模块Compiler、Optimizer和Runtime等进行了适配和优化,并实现了特定语法解析、优化计划、PK列去重逻辑以及Runtime Upsert并发写入等功能,确保对SQL语法的全套支持。如下图所示。

image

其中:

  • 在数据处理完成之后,Meta Service会执行事务冲突检测,原子更新数据文件元信息等,以保障读写隔离和事务一致性。

  • 对于Upsert批式写入能力,由于Transactional Table2.0表在查询时会自动根据PK值来合并记录,因此对于Insert和Update场景,不需要使用复杂的Update、Merge Into语法,可统一使用Insert Into插入新数据即可,使用简单,并且能节省一些读取I/O和计算资源。SQL DML语法详情,请参见DML操作

数据查询

Time travel查询

MaxCompute Transactional Table2.0计算引擎可高效支持Time travel查询的典型业务场景,即查询历史版本的数据。该功能可用于回溯历史状态的业务数据,或在数据出错时用来恢复历史状态数据以进行数据校正。同时,也支持直接使用Restore操作恢复到指定的历史版本。简单示例如下。

//查询指定时间戳的历史数据
SELECT * FROM tt2 timestampasof'2024-04-01 01:00:00';
//查询5分钟之间的历史数据
SELECT * FROM tt2 timestampasofcurrent_timestamp() - 300;
//查询截止到最近第二次Commit写入的历史数据
SELECT * FROM tt2 timestampasof get_latest_timestamp('tt2', 2);

Time travel查询处理过程如下所示。

time travel

在输入Time travel查询语句后,会先从Meta服务中解析出要查询的历史数据版本,然后过滤出要读取的Compacted Data File和Delta Data File,进行合并然后输出。其中,Compacted Data File可以用来加速查询,提高读取效率。

上图以事务表(src)为例:

  • 图左侧展示了数据变化过程,t1~t5代表了5个事务的时间版本,分别执行了5次数据写入的事务,生成了5个Delta Data File,在t2和t4时刻分别执行了COMPACTION操作,生成了2个Compacted Data File(c1和c2),其中c1已经消除了中间状态历史记录(2,a),只保留最新状态的记录(2,b)

  • 如查询t1时刻的历史数据,只需读取Delta Data File数据类型的d1并输出;如查询t2时刻的历史数据,只需读取Compacted Data File数据类型的b1并输出其三条记录;如查询t3时刻的历史数据,就会输出Compacted Data File数据类型的b1以及Delta Data File数据类型的d3并合并输出,可以此类推其他时刻的查询。可见,Compacted Data File文件虽可用来加速查询,但需要触发较重的COMPACTION操作,您需要结合自己的业务场景选择合适的触发策略。

  • 在SQL语法中,除了可以直接指定一些常量和常用函数外,您可以通过timestamp as of exprversion as of expr函数进行精准查询。

增量查询

为了实现Transactional Table2.0的增量查询和增量计算优化,MaxCompute专门设计并开发了新的SQL增量查询语法。关于增量查询语法,详情请参见Incremental查询参数与使用限制。增量查询处理过程如下图所示

增量查询

在输入SQL语句后,MaxCompute引擎将会解析要查询的历史增量数据版本,并过滤要读取的Compacted Data File文件,然后进行合并输出。上图以事务表(src)为例:

  • 图左侧展示了数据变化过程,t1~t5代表了5个事务的时间版本,分别执行了5次数据写入的事务,生成了5个Compacted Data File,在t2和t4时刻分别执行了COMPACTION操作,生成了2个Compacted Data File(c1和c2)。

  • 在具体的查询示例中,例如,begin是t1-1,end是t1,只需读取t1时间段对应的Delta Data File(d1)进行输出;如果end是t2,会读取两个Delta Data File(d1和d2);如果begin是t1,end是t2-1,即查询的时间范围为t1和t2,这个时间段没有任何增量数据插入的,将返回空行。

  • Compact、Merge服务生成的数据(c1和c2)不会作为新增数据重复输出。

PK点查DataSkipping优化

Transactional Table2.0的数据分布和索引基本是按照PK列值进行构建的,当查询Transactional Table2.0时,通过指定PK值进行过滤,可以显著降低数据读取量和查询时间,从而极大地减少资源消耗,效率可能提升数百到数千倍。DataSkipping优化实施过程中涵盖多个层级的精确筛选,以高效定位并读取指定PK值的数据。例如:Transactional Table2.0表的数据记录为一亿条,按照PK值进行过滤,真正从数据文件中读取的数据记录可能只有一万条,数据查询过程如下图所示。

image

具体过程如下:

  1. Bucket裁剪:首先锁定含有目标主键值的特定Bucket,避免对无关的Bucket进行扫描。

  2. 数据文件内部裁剪:在已确定的Bucket内,进一步筛选仅包含所需主键值的数据文件,以确保读取操作的针对性。

  3. Block级主键值域过滤:深入到文件内部的Block层面,依据Block中主键值的分布范围进行精确过滤,确保仅提取包含指定主键值的Block进行处理。

SQL查询分析Plan优化

image

如上图所示,Transactional Table2.0表数据已根据其PK值进行了分桶分布处理,且各桶内数据具有唯一性(Unique)和有序性(Sort),基于这些属性SQL Optimizer主要做了以下优化:

  • 利用PK列的Unique属性避免了去重(Distinct)操作。由于查询涉及的PK列本身就保证了唯一性,SQL Optimizer能够识别并省略Distinct运算步骤,从而避免了额外计算开销。

  • 应用Bucket Local Join可避免全局Shuffle。由于Join操作依据的关键字段与PK列相同,SQL Optimizer能够选择执行Bucket Local Join策略,避免全局Shuffle过程。这极大地降低了数据在节点间大规模交换的需要,减少了资源消耗,从而提升了处理速度和系统的整体性能。

  • 基于数据的有序性采用Merge Join算法替代Sort。由于每个分桶内的数据是有序的,SQL Optimizer可以选择高效的Merge Join算法进行连接操作,无需预先对数据进行排序,从而降低计算复杂度,节省了计算资源。

通过消除Distinct、Sort和全局Shuffle等高资源消耗的算子,实现了查询性能超过一倍的提升,充分展现了对Transactional Table2.0表特性的有效利用及其对查询效率的重大影响。

  • 本页导读 (1)