MaxCompute新增支持将MapReduce作业指定为SQL运行时(Runtime)执行模式,基于SQL运行时,MapReduce作业可以应用SQL引擎的各种新特性,实现之前不支持的功能。本文为您介绍如何将MapReduce作业指定为SQL运行时执行模式。
背景信息
MaxCompute提供了MapReduce编程接口,您可以使用MapReduce提供的接口(Java API)编写MapReduce程序处理MaxCompute的中的数据。
新版本MaxCompute支持将MapReduce作业指定为SQL运行时执行模式,基于SQL运行时,MapReduce可以使用MaxCompute SQL引擎编译器、基于代价的优化器和向量化执行的执行引擎,同时可以复用SQL引擎开发的各种新特性,包括功能、性能、稳定性等方面不断的优化改进。
MapReduce指定为SQL运行时执行后可以使用MaxCompute SQL的新特性,可以满足原来MapReduce不支持的功能。相比原来MapReduce,新增如下功能:
支持输入源为视图。
支持输入源为外部表。
支持分布式文件系统读写。
支持Hash或Range聚簇表读写。
此外还新增支持如下能力:
可以持续获得SQL在CBO优化器和向量化执行引擎等方面的持续性能优化收益。
可以使用新的存储格式压缩机制。
支持动态调整并发度,提高输入表为HashCluster等超大表Join场景的性能。
可以获得经过大量作业和压力验证的SQL引擎稳定性优势,Failover和PVC等机制更有保障。
MaxCompute Studio和Logview针对SQL作业提供了更详细的执行信息、执行计划、编译信息、作业配置信息等,可以对任务运行各阶段输入输出及整体流程一目了然,能方便的定位问题和优化,提高开发和运维效率。
注意事项
该功能不需要您对原接口和作业逻辑做任何改动,只需要指定执行模式。
该功能仅支持MapReduce接口编写的MapReduce作业,MapReduce接口请参见原生SDK概述。
基于SQL运行时执行模式运行MapReduce作业,MapReduce作业仍然按照MapReduce计费规则进行计费,详情请参见MapReduce按量付费。
使用说明
设置执行模式。
执行模式可以通过
odps.mr.run.mode
开关控制,取值如下:lot
(默认):使用MapReduce引擎执行MapReduce任务。sql
:使用SQL引擎执行MapReduce任务,即SQL运行时执行模式。如果执行不通过,则会报错。hybrid
:优先尝试使用SQL引擎执行,不通过时会回退到MapReduce引擎执行。
您可通过两种方式设置执行模式:
Project级别控制
全局性统一打开,针对所有作业有效,需要Project管理员执行如下命令:
setproject odps.mr.run.mode=<lot/sql/hybrid>;
Session级控制
只针对当前作业有效,有如下两种方式:
运行JAR语句前添加
set odps.mr.run.mode=<lot/sql/hybrid>
语句。在作业代码中通过
job
设置,如下所示:JobConf job = new JobConf(); job.set("odps.mr.run.mode","hybrid")
说明对于特殊功能场景StreamJob和SecondarySort的功能,需要设置以下Flag:
StreamJob:
set odps.mr.sql.stream.enable=true;
。SecondarySort:
set odps.mr.sql.group.enable=true;
。
查看运行详情。
您可通过Logview、MaxCompute Studio等查看MapReduce基于SQL运行时执行在客户端生成的SQL表达式,以及作业的运行详情,Logview操作说明请参见使用Logview 2.0查看作业运行信息。
Logview XML
打开Logview,在Source XML页签中查看客户端提交的XML信息,显示了MapReduce作业同义的SQL表达。示例如下所示:
create temporary function mr2sql_mapper_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotMapperUDTF' using ; create temporary function mr2sql_reducer_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotReducerUDTF' using ; @sub_query_mapper := SELECT k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code FROM( SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) FROM ae_antispam.product_sku_tt_inc WHERE ds = "20180615" AND hh = "21" UNION ALL SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) FROM ae_antispam.product_sku ) open_mr_alias1 DISTRIBUTE BY k_id SORT BY k_id ASC; @sub_query_reducer := SELECT mr2sql_reducer_152955927079392291755(k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) as (id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code) FROM @sub_query_mapper; FROM @sub_query_reducer INSERT OVERWRITE TABLE ae_antispam.product_sku SELECT id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ;
Logview Summary
在Logview的Summary页签您可以看到作业的执行采用了SQL运行时的执行引擎
execution engine
,示例如下:说明未开启SQL运行时执行模式的MapReduce作业无执行引擎信息,未开启SQL运行时执行模式的MaxCompute扩展MapReduce(MR2)作业的执行引擎为
cganjiang
。Job run mode: fuxi job Job run engine: execution engine
Logview JSONSummary
MapReduce的JSONSummary信息仅包含了简单的Map和Reduce输入输出信息。 SQL的JSONSummary信息可以查看SQL执行各阶段的详细信息,包含所有执行参数、逻辑计划、物理计划和执行详情等。示例如下所示:
"midlots" : [ "LogicalTableSink(table=[[odps_flighting.flt_20180621104445_step1_ad_quality_tech_qp_algo_antifake_wordbag_filter_bag_change_result_lv2_20, auctionid,word,match_word(3) {0, 1, 2}]]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$2], word=[$3], match_word=[$4]) OdpsLogicalTableFunctionScan(invocation=[[MR2SQL_MAPPER_152955294118813063732($0, $1)]()], rowType=[RecordType(VARCHAR(2147483647) item_id, VARCHAR(2147483647) text, VARCHAR(2147483647) __tf_0_0, VARCHAR(2147483647) __tf_0_1, VARCHAR(2147483647) __tf_0_2)]) OdpsLogicalTableScan(table=[[ad_quality_tech.qp_algo_antifake_wordbag_filter_bag_change_lv2_20, item_id,text(2) {0, 1}]]) ]