合并小文件

分布式文件系统按块(Block)存放数据,文件大小比块大小(64MB)小的文件称为小文件。分布式系统不可避免会产生小文件,比如SQL或其他分布式引擎的计算结果、Tunnel数据采集。合并小文件可以达到优化系统性能的目的。本文为您介绍如何在MaxCompute中合并小文件。

背景信息

小文件过多,会带来以下问题:

  • MaxCompute处理单个大文件比处理多个小文件更有效率,小文件过多会影响整体的执行性能。

  • 小文件过多会给Pangu文件系统带来一定的压力,影响存储空间的有效利用。

因此从存储和性能两方面考虑,都需要将计算过程中产生的小文件合并。MaxCompute在小文件处理方面的功能日趋完善,主要体现在以下方面:

  • 默认情况下,当作业完成之后,如果满足一定的条件,系统会自动分配一个Fuxi Task进行小文件合并,即使用过程中经常看到的MergeTask。

  • 默认情况下,一个Fuxi Instance不再只能处理一个小文件,而是最多可以处理100个小文件,同时可以通过odps.sql.mapper.merge.limit.size参数来控制读取文件总大小。

  • MaxCompute后台会定期扫描元数据库,对小文件较多的表或分区进行小文件合并,这个合并过程对您透明。

但是通过元数据发现仍然存在大量的小文件未被合并掉,例如有的表一直在写入,无法自动执行合并操作,需要您先将写入作业停止,然后再手工进行小文件合并操作。

注意事项

  • 使用合并小文件功能需要用到计算资源,如果您购买的实例是按量计费,会产生相关费用,具体计费规则与SQL按量计费保持一致,详情请参见SQL作业按量付费

  • 合并小文件命令不支持Transactional表,如果需要对Transactional表进行小文件合并,请参见COMPACTION

查看表的文件数

  • 命令语法

    您可以通过如下命令查看表的文件数:

    desc extended <table_name> [partition (<pt_spec>)];
  • 参数说明

    • table_name:必填。待查看表的名称。

    • pt_spec:可选。待查看分区表的指定分区。格式为(partition_col1 = partition_col_value1, partition_col2 = partition_col_value2, ...)

  • 结果示例

    如下图所示,odl_bpm_wfc_task_log表的示例分区有3607个文件, 分区大小274MB (287869658Byte),平均文件大小约为0.07MB,需要进行小文件合并。

    结果示例

解决方案

通过元数据检测,分区中含有100个以上的文件且平均文件大小小于64MB的都可以进行小文件合并,合并的方案有如下两种。

  • 即时合并

    使用如下命令进行小文件即时合并。

    ALTER TABLE <table_name> [partition (<pt_spec>)] MERGE SMALLFILES;

    odl_bpm_wfc_task_log表执行即时合并操作,完成后如下所示文件数减少为19个,存储大小也减少到37MB,在合并小文件的同时,存储效率也有了明显提升。即时合并

    一般情况下,使用默认参数可以达到合并小文件的效果。但MaxCompute同时提供一些参数完成定制需求,常用的一些参数如下:

    • set odps.merge.cross.paths=true|false

      设置是否跨路径合并,对于表下面有多个分区的情况,合并过程会将多个分区生成独立的MergeAction进行合并,所以对于odps.merge.cross.paths设置为true,并不会改变路径个数,只是分别去合并每个路径下的小文件。

    • set odps.merge.smallfile.filesize.threshold = 64

      设置合并文件的小文件大小阈值,文件大小超过该阈值,则不进行合并,单位为MB。此参数可以不进行设置,不设置时,则使用全局变量odps_g_merge_filesize_threshold,该参数值默认为32MB,设置时必须大于32MB。

    • set odps.merge.maxmerged.filesize.threshold = 500

      设置合并输出文件量的大小,输出文件大于该阈值,则创建新的输出文件,单位为MB。此参数可以不进行设置,不设置时,则使用全局变odps_g_max_merged_filesize_threshold,该参数值默认为500MB,设置时必须大于500MB。

  • PyODPS脚本合并

    通过PyODPS异步提交任务,合并前一天任务产出的小文件,脚本示例如下:

    import os
    from odps import ODPS
    
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用 Access Key ID / Access Key Secret 字符串
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    #注意需要替换$table_name为所需的表名
    table_name = $table_name
    t = odps.get_table(table_name)
    
    #设置merge选项
    hints = {'odps.merge.maxmerged.filesize.threshold': 256}
    
    #examples for multi partition
    insts = []
    #iterate_partitions list列举ds=$datetime下所有分区,分区格式也可能是其他格式
    for partition in t.iterate_partitions(spec="ds=%s" % $datetime):
        instance=odps.run_merge_files(table_name, str(partition), hints=hints)
    		#从这个logview的Waiting Queue点进去,才可以找到真正执行的logview
        print(instance.get_logview_address())
        insts.append(instance)
    
    #等待分区结果
    for inst in insts:
        inst.wait_for_completion()

    运行上述脚本需要提前安装PyODPS,安装方法请参见PyODPS

使用案例

tbcdm.dwd_tb_log_pv_di是数据稳定性体系识别出来的需要合并小文件的物理表,通过元数据tbcdm.dws_rmd_merge_task_1d提供的信息,如下图所示,可以看出此表相关分区的文件个数大部分都在1000以上,多的甚至达到7000以上,但平均文件大小有些还不到1MB。使用示例使用如下命令采用即时合并方案对其进行小文件合并。

set odps.merge.cross.paths=true;
set odps.merge.smallfile.filesize.threshold=128;
set odps.merge.max.filenumber.per.instance = 2000;
alter table tbcdm.dwd_tb_log_pv_di partition (ds='20151116') merge smallfiles;

合并小文件后结果如下图所示:优化结果