Compaction可以把所有的数据文件按照一定策略进行Merge操作,可提升查询效率。

功能介绍

Delta Table支持近实时增量写入和time travel查询特性,在数据频繁写入的场景中,必然会引入大量的小文件,需要设计合理高效的合并策略来对小文件进行合并以及数据去重,解决大量小文件读写IO低效以及缓解存储系统的压力,但也要避免频繁Compact引发严重的写放大和冲突失败。

目前主要支持两种数据合并方式:

  • Clustering:只是把Commit的DeltaFile合并成一个大文件,不改变数据内容。系统内部会根据新增的文件大小、文件数量等因素周期性地执行,不需要用户手动操作。主要解决小文件IO读写效率和稳定性问题。

  • Compaction:会把所有的数据文件按照一定策略进行Merge操作,生成一批新的BaseFile,相同PK的数据行只存储最新的状态,不包含任何历史状态,也不会包含任何系统列信息,主要用于提升查询效率。

命令格式

alter table <table_name> [partition 
                          (<partition_key> = '<partition_value>' [, ...])
                         ] 
                         compact major;

注意事项

  • 手动执行Compaction需要依赖设置一个flag。

     set odps.merge.task.mode=service;
  • Compaction会产生新的文件,虽然可以提升查询效率,但也要承担对应的数据文件存储成本,您需要根据业务场景进行权衡,选择合适的触发频率。

  • Compaction会使用计算资源对数据进行读取重写,若您是按量付费模式,将产生Compaction费用,其计费方式为扫描量*1*单价,详情请参见计算费用(按量付费)。若您是包年包月付费模式,那么Compaction会使用包年包月的计算资源额度。

使用示例

--创建表
create table mf_dt (pk bigint not null primary key, val bigint not null) 
             partitioned by (dd string, hh string) 
             tblproperties ("transactional"="true");
--插入数据            
insert into table mf_dt partition(dd='01', hh='01') values (1, 1), (2, 2);
insert into table mf_dt partition(dd='01', hh='01') values (2, 20), (3, 3);
insert into table mf_dt partition(dd='01', hh='01') values (3, 30), (4, 4);

--执行compaction,以及操作完成后,可继续查询历史数据。
set odps.merge.task.mode=service;
alter table mf_dt partition(dd='01', hh='01') compact major;
select * from mf_dt timestamp as of get_latest_timestamp('mf_dt') where dd='01' and hh='01';
select * from mf_dt timestamp as of get_latest_timestamp('mf_dt', 2) where dd='01' and hh='01';