Distributed MapJoin是MapJoin的升级版,适用于大表Join中表的场景,二者的核心目的都是为了减少大表侧的Shuffle和排序。
注意事项
Join两侧的表数据量要求不同,大表侧数据在10 TB以上,中表侧数据在[1 GB, 100 GB]范围内。
小表侧的数据需要均匀分布,没有明显的长尾,否则单个分片会产生过多的数据,导致OOM(Out Of Memory)及RPC(Remote Procedure Call)超时问题。
SQL任务运行时间在20分钟以上,建议使用Distributed MapJoin进行优化。
由于在执行任务时,需要占用较多的资源,请避免在较小的Quota组运行。
说明您可以在配额(Quota)管理页面,修改配额组,详情请参见Quota管理(新版)。
使用方法
您需要在select
语句中使用Hint提示/*+distmapjoin(<table_name>(shard_count=<n>,replica_count=<m>))*/
才会执行distmapjoin
。shard_count和replica_count共同决定任务运行的并发度,即并发度=shard_count * replica_count
。
参数说明
table_name:目标表名。
shard_count=<n>:设置小表数据的分片数,小表数据分片会分布至各个计算节点处理。n即为分片数,一般按奇数设置。
说明shard_count值建议手动指定,shard_count值可以根据小表数据量来大致估算,预估一个分片节点处理的数据量范围是[200 MB, 500 MB]。
shard_count设置过大,性能和稳定性会受影响;shard_count设置过小,会因内存使用过多而报错。
replica_count=<m>:设置小表数据的副本数。m即为副本数,默认为1。
说明为了减少访问压力以及避免单个节点失效导致整个任务失败,同一个分片的数据,可以有多个副本。当并发过多,或者环境不稳定导致运行节点频繁重启,可以适当提高replica_count,一般建议为2或3。
语法示例
-- 推荐,指定shard_count(replica_count默认为1) /*+distmapjoin(a(shard_count=5))*/ -- 推荐,指定shard_count和replica_count /*+distmapjoin(a(shard_count=5,replica_count=2))*/ -- distmapjoin多个小表 /*+distmapjoin(a(shard_count=5,replica_count=2),b(shard_count=5,replica_count=2)) */ -- distmapjoin和mapjoin混用 /*+distmapjoin(a(shard_count=5,replica_count=2)),mapjoin(b)*/
使用示例
为了便于理解,本文以向分区表tmall_dump_lasttable插入数据为例,为您演示Distributed MapJoin的用法。
常规写法。
insert OVERWRITE table tmall_dump_lasttable partition(ds='20211130') select t1.* from ( select nid, doc,type from search_ods.dump_lasttable where ds='20211203' )t1 join ( select distinct item_id from tbcdm.dim_tb_itm where ds='20211130' and bc_type='B' and is_online='Y' )t2 on t1.nid=t2.item_id;
优化后写法。
insert OVERWRITE table tmall_dump_lasttable partition (ds='20211130') select /*+ distmapjoin(t2(shard_count=35)) */ t1.* from ( select nid, doc, type from search_ods.dump_lasttable where ds='20211203' )t1 join ( select distinct item_id from tbcdm.dim_tb_itm where ds='20211130' and bc_type='B' and is_online='Y' )t2 on t1.nid=t2.item_id;