开发ODPS Script任务

DataWorks为您提供ODPS Script节点,其SQL开发模式是MaxCompute基于2.0的SQL引擎提供的脚本开发模式。本文为您介绍ODPS Script节点的使用。

前提条件

已创建ODPS Script节点,详情请参见创建并管理MaxCompute节点

背景信息

MaxCompute当前的SQL引擎支持脚本模式SQL(Script Mode SQL),在脚本模式下编译脚本时,一个多语句的SQL脚本文件将被作为一个整体进行编译,无需对单个语句进行编译,适合用于改写需要层层嵌套子查询的单个语句,或因为脚本复杂性而不得不拆成多个语句的脚本。在提交运行时,SQL脚本文件会被整体提交,并生成一个执行计划,保证只需排队一次、执行一次,让您能充分利用MaxCompute的资源,详情请参见SQL脚本模式。在DataWorks中,您可通过ODPS Script节点实现MaxCompute SQL脚本模式开发任务代码,并调度其他作业的集成操作。

适用场景

脚本模式的适用场景如下:

  • 脚本模式适合用来改写需要层层嵌套子查询的单个语句,或者因为脚本复杂性而不得不拆成多个语句的脚本。

  • 如果多个输入的数据源数据准备完成的时间间隔很长(例如一个01:00可以准备好,一个07:00可以准备好),则不适合通过table variable衔接拼装为一个大的脚本模式SQL。

  • 脚本模式下,您可以对一个变量赋常量值,然后执行SELECT * FROM 变量语句转化为标量与其它列进行计算。常量值也可以存放在一个单行的表中,命令示例如下。转化语法请参见子查询(SUBQUERY)

    @a := SELECT 10; --对@a赋值常量10,或者赋值存在一个单行表t1中,SELECT col1 FROM t1。
    @b := SELECT key,value+(SELECT * FROM @a) FROM t2 WHERE key >10000; --t2表中value值与@a中的值进行计算。
    SELECT * FROM @b;

语法结构

一个ODPS Script脚本的完整形式为SET语句>DDL语句>DML语句,每种类型语句都可以有0到多个语句,但不同类型的语句不能混合。多个语句以@开始,表示变量连接。语法如下:

--SET语句
set odps.sql.type.system.odps2=true;
[set odps.stage.reducer.num=***;]
[...]
--DDL语句
create table table1 xxx;
[create table table2 xxx;]
[...]
--DML语句
@var1 := SELECT [ALL | DISTINCT] select_expr, select_expr, ...
    FROM table3
    [WHERE where_condition];
@var2 := SELECT [ALL | DISTINCT] select_expr, select_expr, ...
    FROM table4
    [WHERE where_condition];
@var3 := SELECT [ALL | DISTINCT] var1.select_expr, var2.select_expr, ...
    FROM @var1 join @var2 on ...;
INSERT OVERWRITE|INTO TABLE [PARTITION (partcol1=val1, partcol2=val2 ...)]
    SELECT [ALL | DISTINCT] select_expr, select_expr, ...
    FROM @var3;
[@var4 := SELECT [ALL | DISTINCT] var1.select_expr, var.select_expr, ... FROM @var1
    UNION ALL | UNION
    SELECT [ALL | DISTINCT] var1.select_expr, var.select_expr, ... FROM @var2;
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
    AS
    SELECT [ALL | DISTINCT] select_expr, select_expr, ...
    FROM var4;]

使用限制

ODPS Script节点的使用限制如下:

  • 脚本模式支持SET语句、部分DDL语句(结果是屏显类型的语句除外,例如descshow)和DML语句。

  • 一个脚本,目前最多支持一个屏幕显示结果的语句(例如单独的Select语句),否则会报错。不建议您在脚本中执行屏幕显示的Select语句。

  • 一个脚本,目前最多支持一个Create table as语句,并且必须是最后一句。建议将建表语句和Insert语句分开写。

  • 脚本模式下,如果有一个语句失败,整个脚本的语句都不会执行成功。

  • 脚本模式下,只有所有输入的数据都准备完成,才会生成一个作业进行数据处理。

  • 脚本模式下,如果一个表被写入后又被读取,会报错。

    insert overwrite table src2 select * from src where key > 0;
    @a := select * from src2;
    select * from @a;

    为避免先写后读,可以进行如下修改。

    @a := select * from src where key > 0;
    insert overwrite table src2 select * from @a;
    select * from @a;

编辑代码:简单示例

MaxCompute Script Mode的SQL编译较为简单,您只需要按照业务逻辑,用类似于普通编程语言的方式进行编译,无需考虑如何组织语句。以下以一个简单示例为您介绍ODPS Script节点的使用。

create table if not exists dest(key string , value bigint) ;
create table if not exists dest2(key string,value bigint ) ;
@a := select * from src where value >0;
@b := select * from src2 where key is not null;
@c := select * from src3 where value is not null;
@d := select a.key,b.value from @a left outer join @b on a.key=b.key and b.value>0;
@e := select a.key,c.value from @a inner join @c on a.key=c.key;
@f := select * from @d union select * from @e union select * from @a;
insert overwrite table dest  select * from @f;
@g := select e.key,c.value  from @e join @c on e.key=c.key;
insert overwrite table dest2 SELECT * from @g;
说明

本示例中需要用到的三张表src、src1、src2,建表语句如下:

--创建表
create table if not EXISTS  src(key string ,value BIGINT);
insert into src values ('1',11) ;
insert into src values ('1',11) ;
create table if not EXISTS  src2(key string ,value BIGINT);
insert into src2  values ('1',22);
insert into src2  values ('2',22);
create table if not EXISTS  src3(key string ,value BIGINT);
insert into src3 values ('1',33);
insert into src3 values ('3',33);

编辑代码:进阶示例

MaxCompute脚本模式支持IF语句,IF语句可以使程序根据条件,自动选择执行逻辑。MaxCompute的IF语法根据Condition类型分为BOOLEAN类型,以及BOOLEAN的Scalar SubQuery。

  • IF条件为BOOLEAN类型表达式

    这种类型的IF ELSE语句可以在编译阶段决定执行哪个分支,示例如下:

    --数据处理
    set odps.sql.allow.fullscan=true;
    set odps.optimizer.cbo.rule.filter.black=LM; 
    @date := '${var}';
    @row  TABLE(key int,value bigint); --声明变量row,其类型为Table,schema为string. 
    IF ( cast(@date  as bigint) % 2 == 0 ) BEGIN 
    @row  := SELECT key,value from src1; 
    END ELSE BEGIN
    @row  := SELECT key,value from src2; 
    END
    INSERT OVERWRITE TABLE dest1 partition(p='${var}')  SELECT key,value FROM @row; 
    说明

    代码中定义了名为var的变量,您需要在调度参数配置处为var变量进行赋值。

  • IF条件为BOOLEAN的Scalar SubQuery

    这种类型的IF ELSE语句在编译阶段无法决定执行哪个分支,在运行时才能决定。因此,需要提交多个作业,示例如下:

    @i bigint;
    @t table(id bigint, value bigint);
    IF ((SELECT count(*) FROM src WHERE a = '5') > 1) BEGIN
    @i := 1;
    @t := select @i, @i*2;
    END ELSE
    BEGIN
    @i := 2;
    @t := select @i, @i*2;
    END
    select id, value from @t; 
  • 嵌入式UDF开发

    此外,您也可以通过将Java或Python代码嵌入SQL脚本的方式,使用MaxCompute脚本模式实现代码嵌入式UDF(Embedded UDF)开发,详情请参见UDF(嵌入式)

后续步骤

当您完成当前节点的任务开发后,通常您可进行以下操作。

  • 调度配置:配置节点的周期性调度属性。任务需要周期性调度运行时,您需要设置节点后续实际运行过程中的重跑属性、调度依赖关系等,操作详情请参见任务调度属性配置概述

  • 任务调试:对当前节点的代码进行测试运行,确认代码逻辑符合预期,操作详情请参见任务调试流程

  • 任务发布:完成所有开发相关操作后,您需要将所有任务节点进行发布,发布后节点即会根据调度配置结果进行周期性运行,操作详情请参见发布任务