SELECT TRANSFORM语法允许您启动一个指定的子进程,将输入数据按照一定的格式通过标准输入至子进程,并且通过解析子进程的标准输出获取输出数据。SELECT TRANSFORM让您无需编写UDF,便可以实现MaxCompute SQL对其它脚本语言的支持。

语法介绍

命令格式如下。
SELECT TRANSFORM(arg1, arg2 ...) 
(ROW FORMAT DELIMITED (FIELDS TERMINATED BY field_delimiter (ESCAPED BY character_escape)?)? 
(LINES SEPARATED BY line_separator)? 
(NULL DEFINED AS null_value)?)?
USING 'unix_command_line' 
(RESOURCES 'res_name' (',' 'res_name')*)? 
( AS col1, col2 ...)?
(ROW FORMAT DELIMITED (FIELDS TERMINATED BY field_delimiter (ESCAPED BY character_escape)?)? 
(LINES SEPARATED BY line_separator)? (NULL DEFINED AS null_value)?)?
语法说明:
  • SELECT TRANSFORM关键字可以用MAPREDUCE关键字替换,语义是完全一样的。为使语法更清晰,推荐您使用SELECT TRANSFORM
  • arg1,arg2...:指定输入数据。其格式和SELECT子句类似。默认格式下,参数的各个表达式结果在隐式转换成STRING类型后,用\t拼接,输入到子进程中。
  • USING子句:指定要启动的子进程的命令。
    • 大多数的MaxCompute SQL命令中Using子句指定的是资源(Resources),但此处使用Using子句指定的子进程的命令。使用Using子句是为了和Hive的语法兼容。
    • Using中的格式和Shell的语法类似,但并非真的启动Shell来执行,而是直接根据命令的内容创建子进程。因此,很多Shell的功能不能使用,例如输入输出重定向、管道、循环等。如果有需要,Shell本身也可以作为子进程命令来使用。
  • RESOURCES子句:允许指定子进程能够访问的资源,支持以下两种方式指定资源:
    • 使用RESOURCES子句指定资源。例如,using 'sh foo.sh bar.txt' resources 'foo.sh','bar.txt'
    • 使用Flag指定资源。在SQL语句前使用set odps.sql.session.resources=foo.sh,bar.txt;来指定资源。

      此配置是全局配置,即整个SQL中所有的select transform都可以访问此资源。多个资源文件之间使用英文逗号(,)分隔。

  • ROW FORMAT子句:允许自定义输入输出的格式。
    语法中有两个ROW FORMAT子句,第一个子句指定输入数据的格式,第二个子句指定输出数据的格式。 默认情况下使用\t作为列分隔符,\n作为行分隔符,使用\N表示NULL。
    说明
    • field_delimitercharacter_escapeline_separator只接受一个字符。如果指定的是字符串,则以第一个字符为准。
    • MaxCompute支持Hive指定格式的语法,例如inputRecordReaderoutputRecordReaderSerDe等,但您需要打开Hive兼容模式才能使用。打开方式为在SQL语句前加set语句set odps.sql.hive.compatible=true;。Hive支持的语法详情请参见Hive文档
    • 如果使用Hive的inputRecordReaderoutputRecordReader等自定义类,可能会降低执行性能。
  • AS子句:指定输出列。例如as(col1:bigint, col2:boolean)
    • 输出列可以不指定数据类型,默认为STRING类型。例如as(col1, col2)
    • 由于输出数据实际是解析子进程标准输出获取的,如果指定的数据不是STRING类型,系统会隐式调用CAST函数进行转换,转换过程有可能出现运行异常。
    • 输出列的数据类型不支持部分指定,例如as(col1, col2:bigint)
    • 关键字as可以省略,此时默认标准输出数据中第一个\t之前的字段为Key,后面的部分全部为Value,相当于as(key, value)

调用Shell脚本示例

假设通过Shell脚本生成50行数据,值是从1到50,对应data字段输出如下。
SELECT TRANSFORM(script) USING 'sh' AS (data) 
FROM (
        SELECT  'for i in `seq 1 50`; do echo $i; done' AS script
      ) t
;

直接将Shell命令作为transform数据输入。

SELECT TRANSFORM不仅仅是语言支持的扩展。一些简单的功能,例如AWK、Python、Perl、Shell都支持直接在命令中写脚本,而不需要专门编写脚本文件、上传资源等,开发过程更简单。对于复杂的功能,您可以上传脚本文件来执行,请参见调用Python脚本示例

调用Python脚本示例

  1. 准备Python文件,脚本文件名为myplus.py,如下所示。
    #!/usr/bin/env python
    import sys
    line = sys.stdin.readline()
    while line:
        token = line.split('\t')
        if (token[0] == '\\N') or (token[1] == '\\N'):
            print '\\N'
        else:
            print str(token[0]) +'\t' + str(token[1])
        line = sys.stdin.readline()
  2. 将该Python脚本文件添加为MaxCompute资源(Resource)。
    add py ./myplus.py -f;
    说明 您也可通过DataWorks控制台进行新增资源操作。
  3. 使用select transform语法调用资源。
    --创建测试表。
    CREATE TABLE testdata(c1 bigint,c2 bigint);
    --测试表中插入测试数据。 
    INSERT INTO TABLE testdata VALUES (1,4),(2,5),(3,6); 
    --执行select transform语句。 
    SELECT 
    TRANSFORM (testdata.c1, testdata.c2) 
    USING 'python myplus.py'resources 'myplus.py' 
    AS (result1,result2) 
    FROM testdata;
    --等同于下面语句。
    set odps.sql.session.resources=myplus.py;
    SELECT TRANSFORM (testdata.c1, testdata.c2) 
    USING 'python myplus.py' 
    AS (result1,result2) 
    FROM testdata;
    说明 MaxCompute也支持直接将Python命令作为transform数据输入。例如调用Shell脚本示例也可以用Python命令实现。
    SELECT TRANSFORM('for i in xrange(1, 50):  print i;') USING 'python' AS (data);
    执行结果如下所示。
    +------------+------------+
    | result1    | result2    |
    +------------+------------+
    | 1          | 4          |
    |            | NULL       |
    | 2          | 5          |
    |            | NULL       |
    | 3          | 6          |
    |            | NULL       |
    +------------+------------+

调用Java脚本示例

与调用Python脚本类似,在本例中,您需要编辑好Java文件,导出Jar包,再通过add file方式将Jar包添加为MaxCompute资源,最后用于select transform调用。
  1. 准备好Jar文件,脚本文件名为Sum.jar,Java代码如下所示。
    package com.aliyun.odps.test;
    import java.util.Scanner
    public class Sum {
        public static void main(String[] args) {
            Scanner sc = new Scanner(System.in);
            while (sc.hasNext()) {
                String s = sc.nextLine();
                String[] tokens = s.split("\t");
                if (tokens.length < 2) {
                    throw new RuntimeException("illegal input");
                }
                if (tokens[0].equals("\\N") || tokens[1].equals("\\N")) {
                    System.out.println("\\N");
                }
                System.out.println(Long.parseLong(tokens[0]) + Long.parseLong(tokens[1]));
            }
        }
    }
  2. 将Jar文件添加为MaxCompute的资源。
    add jar ./Sum.jar -f;
  3. 使用select transform语法调用资源。
    --创建测试表。
    CREATE TABLE testdata(c1 bigint,c2 bigint); 
    --测试表中插入测试数据。
    INSERT INTO TABLE testdata VALUES (1,4),(2,5),(3,6); 
    --执行Select Transform语句。
    SELECT TRANSFORM(testdata.c1, testdata.c2) USING 'java -cp Sum.jar com.aliyun.odps.test.Sum' resources 'Sum.jar' from testdata;
    --等同于如下语句。
    set odps.sql.session.resources=Sum.jar;
    SELECT TRANSFORM(testdata.c1, testdata.c2) USING 'java -cp Sum.jar com.aliyun.odps.test.Sum' FROM testdata;
    执行结果如下所示。
    +-----+
    | cnt |
    +-----+
    | 5   |
    | 7   |
    | 9   |
    +-----+
大多数Java的方法可以直接使用本方法运行。
说明 Java和Python虽然有现成的UDTF框架,但是用select transform编写更简单、不需要额外依赖以及没有格式要求,甚至可以实现直接使用离线脚本(Java和Python离线脚本的实际路径,可以从JAVA_HOMEPYTHON_HOME环境变量中得到)。

调用其他脚本语言

select transform不仅仅支持上述语言扩展,还支持其它的常用Unix命令或脚本解释器,例如AWK、Perl等。
说明 目前由于MaxCompute计算集群上未部署PHP和Ruby,所以不支持调用这两种脚本。
  • 调用AWK,将第二列原样输出,示例如下。
    SELECT TRANSFORM(*) USING "awk '//{print $2}'" as (data) from testdata;
  • 调用Perl,示例如下。
    SELECT TRANSFORM (testdata.c1, testdata.c2) USING "perl -e 'while($input = <STDIN>){print $input;}'" FROM testdata;

串联使用示例

select transform还可以串联使用。例如使用distribute bysort by对输入数据做预处理。
SELECT TRANSFORM(key, value) USING 'cmd2' from 
(
    SELECT TRANSFORM(*) USINg 'cmd1' from 
    (
        SELECt * FROM data distribute by col2 sort by col1
    ) t distribute by key sort by value
) t2;
或使用mapreduce关键字。
@a := select * from data distribute by col2 sort by col1;
@b := map * using 'cmd1' distribute by col1 sort by col2 from @a;
reduce * using 'cmd2' from @b;

SELECT TRANSFORM优势及使用场景

  • 使用场景

    select transform与UDTF在不同场景下性能不同。经过多种场景对比测试,数据量较小时,大多数场景下select transform有优势,而数据量大时UDTF有优势。因为select transform的开发更加简便,所以select transform更适合于Ad Hoc的数据分析。

  • UDTF优势
    • UDTF的输出结果和输入参数支持多种数据类型,而transform的子进程基于标准输入和标准输出传输数据,所有数据都当做STRING类型处理,因此select transform比UDTF多了一步类型转换。
    • select transform数据传输依赖于操作系统的管道,而目前管道的缓存仅有4KB且不能设置。select transform读空或管道写满会导致进程被挂起。
    • UDTF的常量参数可以不用传输,而select transform不支持此功能。
  • SELECT TRANSFORM优势
    • 子进程和父进程是两个进程,而UDTF是单线程的。如果计算占比较高,数据吞吐量较小,select transform可以利用服务器的多核特性。
    • 数据的传输通过更底层的系统调用来读写,效率比Java高。
    • select transform支持的部分工具,例如AWK是Native代码实现的。理论上,与Java相比,使用select transform会更有性能优势。