MapReduce支持SQL运行时执行模式

MaxCompute新增支持将MapReduce作业指定为SQL运行时(Runtime)执行模式,基于SQL运行时,MapReduce作业可以应用SQL引擎的各种新特性,实现之前不支持的功能。本文为您介绍如何将MapReduce作业指定为SQL运行时执行模式。

背景信息

MaxCompute提供了MapReduce编程接口,您可以使用MapReduce提供的接口(Java API)编写MapReduce程序处理MaxCompute的中的数据。

新版本MaxCompute支持将MapReduce作业指定为SQL运行时执行模式,基于SQL运行时,MapReduce可以使用MaxCompute SQL引擎编译器、基于代价的优化器和向量化执行的执行引擎,同时可以复用SQL引擎开发的各种新特性,包括功能、性能、稳定性等方面不断的优化改进。

MapReduce指定为SQL运行时执行后可以使用MaxCompute SQL的新特性,可以满足原来MapReduce不支持的功能。相比原来MapReduce,新增如下功能:

  • 支持输入源为视图。

  • 支持输入源为外部表。

  • 支持分布式文件系统读写。

  • 支持Hash或Range聚簇表读写。

此外还新增支持如下能力:

  • 可以持续获得SQL在CBO优化器和向量化执行引擎等方面的持续性能优化收益。

  • 可以使用新的存储格式压缩机制。

  • 支持动态调整并发度,提高输入表为HashCluster等超大表Join场景的性能。

  • 可以获得经过大量作业和压力验证的SQL引擎稳定性优势,Failover和PVC等机制更有保障。

  • MaxCompute Studio和Logview针对SQL作业提供了更详细的执行信息、执行计划、编译信息、作业配置信息等,可以对任务运行各阶段输入输出及整体流程一目了然,能方便的定位问题和优化,提高开发和运维效率。

注意事项

  • 该功能不需要您对原接口和作业逻辑做任何改动,只需要指定执行模式。

  • 该功能仅支持MapReduce接口编写的MapReduce作业,MapReduce接口请参见原生SDK概述

  • 基于SQL运行时执行模式运行MapReduce作业,MapReduce作业仍然按照MapReduce计费规则进行计费,详情请参见MapReduce按量付费

使用说明

  1. 设置执行模式。

    执行模式可以通过odps.mr.run.mode开关控制,取值如下:

    • lot(默认):使用MapReduce引擎执行MapReduce任务。

    • sql:使用SQL引擎执行MapReduce任务,即SQL运行时执行模式。如果执行不通过,则会报错。

    • hybrid:优先尝试使用SQL引擎执行,不通过时会回退到MapReduce引擎执行。

    您可通过两种方式设置执行模式:

    Project级别控制

    全局性统一打开,针对所有作业有效,需要Project管理员执行如下命令:

    setproject odps.mr.run.mode=<lot/sql/hybrid>;

    Session级控制

    只针对当前作业有效,有如下两种方式:

    • 运行JAR语句前添加set odps.mr.run.mode=<lot/sql/hybrid>语句。

    • 在作业代码中通过job设置,如下所示:

      JobConf job = new JobConf();
      job.set("odps.mr.run.mode","hybrid")
    说明

    对于特殊功能场景StreamJob和SecondarySort的功能,需要设置以下Flag:

    • StreamJob: set odps.mr.sql.stream.enable=true;

    • SecondarySort: set odps.mr.sql.group.enable=true;

  2. 查看运行详情。

    您可通过Logview、MaxCompute Studio等查看MapReduce基于SQL运行时执行在客户端生成的SQL表达式,以及作业的运行详情,Logview操作说明请参见使用Logview 2.0查看作业运行信息

    • Logview XML

      打开Logview,在Source XML页签中查看客户端提交的XML信息,显示了MapReduce作业同义的SQL表达。示例如下所示:

      create temporary function mr2sql_mapper_152955927079392291755 as   'com.aliyun.odps.mapred.bridge.LotMapperUDTF' using ; 
      create temporary function mr2sql_reducer_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotReducerUDTF' using ; 
      
      @sub_query_mapper :=
      SELECT k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code
      FROM(
        SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code)
        FROM ae_antispam.product_sku_tt_inc
        WHERE ds = "20180615"  AND hh = "21"                     
        UNION ALL
        SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code)
        FROM ae_antispam.product_sku
      ) open_mr_alias1
      DISTRIBUTE BY k_id SORT BY k_id ASC;
      
      @sub_query_reducer := 
      SELECT mr2sql_reducer_152955927079392291755(k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) as (id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code)
      FROM @sub_query_mapper;
      FROM @sub_query_reducer	
      INSERT OVERWRITE TABLE ae_antispam.product_sku
      SELECT id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ;
    • Logview Summary

      在Logview的Summary页签您可以看到作业的执行采用了SQL运行时的执行引擎execution engine,示例如下:

      说明

      未开启SQL运行时执行模式的MapReduce作业无执行引擎信息,未开启SQL运行时执行模式的MaxCompute扩展MapReduce(MR2)作业的执行引擎为cganjiang

       Job run mode: fuxi job
       Job run engine: execution engine
    • Logview JSONSummary

      MapReduce的JSONSummary信息仅包含了简单的Map和Reduce输入输出信息。 SQL的JSONSummary信息可以查看SQL执行各阶段的详细信息,包含所有执行参数、逻辑计划、物理计划和执行详情等。示例如下所示:

       "midlots" : 
       [
       "LogicalTableSink(table=[[odps_flighting.flt_20180621104445_step1_ad_quality_tech_qp_algo_antifake_wordbag_filter_bag_change_result_lv2_20, auctionid,word,match_word(3) {0, 1, 2}]])
      OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2])
      OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2])
      OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2])
      OdpsLogicalProject(auctionid=[$2], word=[$3], match_word=[$4])
      OdpsLogicalTableFunctionScan(invocation=[[MR2SQL_MAPPER_152955294118813063732($0, $1)]()], rowType=[RecordType(VARCHAR(2147483647) item_id, VARCHAR(2147483647) text, VARCHAR(2147483647) __tf_0_0, VARCHAR(2147483647) __tf_0_1, VARCHAR(2147483647) __tf_0_2)])
      OdpsLogicalTableScan(table=[[ad_quality_tech.qp_algo_antifake_wordbag_filter_bag_change_lv2_20, item_id,text(2) {0, 1}]])
      ]