周期调度任务(邀测)

MaxCompute周期调度任务(Periodic Task)功能,支持通过SQL语句自定义调度和计算逻辑,从而高效灵活地管理定时调度任务,实现更短的周期、高效的运行及低成本的调度管理。

功能介绍

MaxCompute周期调度任务(Periodic Task)可以灵活自定义任务调度策略,执行对应的SQL计算逻辑,自动构建数据管道,简化ETL链路。此外,您还可以将任务与表数据变更和流读场景相结合,从而实现连续的ETL工作流程,以处理最近更改的数据。该功能具有以下特性:

  • 支持SQL语言:所有针对定时任务的管理操作均可通过简洁的SQL语言实现。

  • 支持全生命周期管理:包括创建、修改、删除、查看以及查看任务列表和任务执行实例列表。

  • 秒级别周期调度任务:支持最短10秒的定时调度。

  • MaxCompute内置调度服务,更高效成本更低。

  • 支持配置实例启动条件的When表达式,可实现较为复杂的条件判断。

  • 支持与平台内对象的无缝结合。例如:支持Delta Table CDC、Object Table等需要定期刷新的对象,提供高效的内置调度服务。

  • 默认支持增量CDC数据后台自动生成。

该功能目前处于邀测中,邀测功能使用方法请参见使用说明

注意事项

创建或删除周期调度任务前,您需要在Session级别设置SET odps.sql.periodic.task.enabled=true;参数,打开Task自动执行增量更新开关。否则无法创建或删除周期调度任务。

创建周期调度任务(Periodic Task)

语法

CREATE TASK [IF NOT EXISTS] <task_name>
    SCHEDULE = "<num> [minute|minutes] | [second|seconds]"
    [TASKPROPERTIES("schedule_strategy"="3",--指定定时调度任务重试次数
               "odps.namespace.schema"="true" ....];
    [COMMENT task_comment]
    [WHEN <bool_expr>]
    AS <taskBody>;

参数说明

配置项

是否必填

描述

IF NOT EXISTS

创建任务时:

  • 若指定该选项:无论是否存在同名任务,即使原表结构与待创建的目标任务结构不一致,均会返回成功,且已存在的同名任务不会受到影响。

  • 未指定该选项:当存在同名任务,将返回报错。

task_name

周期调度任务名称。

SCHEDULE = "<num> [minute|minutes] | [second|seconds]"

执行周期时长。支持的单位为分钟、秒,取值范围为[10秒, 59*60秒]。

说明

单位为秒的数值限制为10~59,大于60秒时,需要将单位设置为分钟。

TASKPROPERTIES("schedule_strategy"="3","odps.namespace.schema"="true" ....]

定时任务调度执行SQL时需要指定的参数,均可通过此处配置。常见取值为schedule_strategy,即指定定时调度任务重试次数。定时任务调度执行SQL示例如下:

  • taskproperties('odps.stage.reducer.num'='100'):自动启动的任务Reduce TaskInstance数量为100个。

  • taskproperties('odps.sql.reducer.memory'='2048'):自动启动的任务Reduce Task的每个InstanceMemory大小为2048 MB。

COMMENT task_comment

周期调度任务的注释内容。注释内容为长度不超过1024字节的有效字符串,否则报错。

WHEN

支持配置一个BOOLEAN表达式。仅当该表达式返回TRUE时,后续SQL的执行才会被触发。如果未进行配置,则默认为TRUE。目前对支持的条件语句进行了相应的约束,只有部分函数支持使用。

说明
  • 支持BOOLEAN表达式内常用的SCALAR函数,不能使用窗口函数和聚合函数,不能使用子查询嵌套。

  • SCALAR函数可以使用CURRENT_TIMESTAMP、GETDATE、WEEKDAY等获取日期、TO_DATE、CONCAT、LENGTH、REGEXP_REPLACE、UPPER、SPLIT等进行转换、计算和替换,也可以使用MAX_PT、TABLE_EXISTS、PARTITION_EXISTS、STREAM_HAS_DATA、TABLE_NEED_CDC_BUILD获取表和分区的信息等,支持的函数列表详情,请参见函数白名单

AS <taskBody>

要执行的SQL任务。SQL语句不支持脚本模式执行的语句,目前支持常用的DMLDQL语法,不支持DDL。

示例

创建调度任务periodic_task1,每5分钟(minutes)调度一次,调度失败会重试3次。仅当判断条件stream_has_data('delta_table_stream')返回结果为TRUE时,执行SQL任务INSERT INTO acid2_table_dest SELECT pk, val FROM acid_stream

  1. 创建一个Delta Table增量表。

    CREATE TABLE acid2_table_dest (
      pk BIGINT NOT NULL PRIMARY KEY, 
      val BIGINT
    ) tblproperties ("transactional"="true");
  2. 创建Delta Table源表,再创建一个Stream关联Delta Table表。

    -- 创建Delta Table源表
    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY,
      val bigint
    ) tblproperties ("transactional"="true");
    
    -- 创建一个Stream关联Delta Table表
    CREATE STREAM delta_table_stream 
    ON TABLE delta_table_src version as of 1 
    strmproperties('read_mode'='append') 
    comment 'stream demo';
  3. 创建一个定时调度任务periodic_task1。

    --开启Task自动执行增量更新
    SET odps.sql.periodic.task.enabled=true; 
    CREATE TASK IF NOT EXISTS periodic_task1  
      schedule = '5 minutes' 
      TASKPROPERTIES('schedule_strategy'='3')
      comment 'task_comment'
      WHEN stream_has_data('delta_table_stream') 
      AS INSERT INTO acid2_table_dest SELECT pk, val FROM delta_table_stream;
    说明

    其中stream_has_data('delta_table_stream') 的条件判断,等同于在MaxCompute中执行SELECT stream_has_data('delta_table_stream');命令。

  4. 插入数据至Delta Table源表,并等待调度结束,查看目标表数据。

    -- 插入数据至Delta Table源表
    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
    
    -- 等待调度结束, 查看目标表数据
    SELECT * FROM acid2_table_dest;

    返回结果如下:数据已成功写入acid2_table_dest表。

    --查到两条结果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+

修改定时调度任务(Periodic Task)

语法

  • 修改定时调度任务的调度时间(执行周期时长)、SQL参数(例如任务重试次数等)及注释内容

    ALTER TASK [IF EXISTS] <task_name>
        SET SCHEDULE = "<num> [minute|minutes] | [second|seconds]"
        [TASKPROPERTIES("schedule_strategy"="3",--指定定时调度任务重试次数
                   "odps.namespace.schema"="true" ....];
        [COMMENT task_comment];
  • 重启(resume) 或暂停(suspend)周期调度任务

    说明

    创建定时调度任务后会自动启动,若定时调度任务出现问题或手动执行了暂停(suspend)操作,导致任务暂停,您可以通过重启(resume)操作,重新启动定时调度任务。重启后通过DESC查看定时调度任务的状态是ACTIVE。

    ALTER TASK [IF EXISTS] task_name resume | suspend;
  • 修改周期调度任务的触发条件判断WHEN表达式,创建Task后可通过该语法修改WHEN表达式

    ALTER TASK [IF EXISTS] task_name MODIFY WHEN <boolean_expr>;

参数说明

配置项

是否必选

描述

IF EXISTS

修改定时调度任务时:

  • 指定该选项:若指定的定时调度任务不存在,将会跳过执行并返回成功;若指定的定时调度任务存在,将对指定的任务进行修改。

  • 不指定该选项:若指定的定时调度任务不存在,则创建任务报错。

boolean_expr

周期调度任务的触发条件判断WHEN表达式。

其他更多相关参数说明,请参见参数说明

示例

  • 示例1:重启周期调度任务

    ALTER TASK periodic_task1 resume; 
  • 示例2:暂停周期调度任务

    ALTER TASK periodic_task1 suspend; 
  • 示例3:修改调度时间周期为6分钟

    ALTER TASK IF EXISTS periodic_task1 SET schedule = '6 minute';
  • 示例4:添加注释内容

    ALTER TASK IF EXISTS periodic_task1 SET comment 'comment2';
  • 示例5:指定周期调度任务重试次数为5

    ALTER TASK IF EXISTS periodic_task1 SET taskproperties('schedule_strategy'='5');
  • 示例6:修改任务触发条件判断WHEN表达式

    ALTER TASK IF EXISTS periodic_task1 MODIFY WHEN 1=1;

查看定时调度任务(Periodic Task)信息

语法

  • 查看定时调度任务信息

    DESC task <task_name>;
  • 查看定时调度任务信息以及Extended信息

    DESC extended task <task_name>;

参数说明

task_name:周期调度任务名称。

示例

查看定时调度任务periodic_task1信息。

DESC task periodic_task1;

返回结果示例如下:

+------------------------------------------------------------------------------------+
| Owner:                    ALIYUN$odps****@aliyun.com                              |
| Project:                  sql_odps2                                          |
| Schema:                                                                            |
| Task:                     periodic_task1                                           |
+------------------------------------------------------------------------------------+
| CreateTime:               2024-08-23 11:05:46                                      |
| LastModifiedTime:         2024-08-23 11:05:46                                      |
+------------------------------------------------------------------------------------+
| ScheduleText:             '5 minute'                                               |
| TaskType:                 SQL                                                      |
| Query:                    insert into acid2_table_dest select pk, val from acid_stream;                                                |
| Condition:                stream_has_data('acid_stream')                           |
| Status:                   ACTIVE                                                   |
| Comment:                                                                           |
+------------------------------------------------------------------------------------+

删除定时调度任务(Periodic Task)

语法

DROP TASK [IF EXISTS] <task_name>; 

参数说明

配置项

是否必填

描述

IF EXISTS

删除定时调度任务时:

  • 指定该选项:若指定的定时调度任务不存在,将会跳过执行并返回成功;若指定的定时调度任务存在,将删除指定任务。

  • 不指定该选项:若指定的定时调度任务不存在,则删除任务报错。

task_name

需要删除的定时调度任务(Periodic Task)名称。

示例

删除periodic_task1任务,命令如下:

DROP TASK periodic_task1;

查看所有定时调度任务(Periodic Task)列表

您可以通过以下SQL,查看当前MaxCompute项目下的所有定时调度(Periodic Task)任务。

语法

SHOW TASKS; 

示例

查看已创建的定时调度任务。

SHOW TASKS; 

返回结果示例如下:

+--------------------------------------------------------------------------------------------------------------------------------+
| Project:                  openmronlot_daily_arm_src3_xr                                                                        |
| Schema:                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
| Tasks:                                                                                                                         |
+--------------------------------------------------------------------------------------------------------------------------------+
| Name                      | Owner                             | CreateTime             | LastModifiedTime       | Status       |
+--------------------------------------------------------------------------------------------------------------------------------+
| periodic_task1            | ALIYUN$odpst****@aliyun.com       | 2024-07-26 16:19:06    | 2024-07-26 16:19:06    | ACTIVE       |
| periodic_task2            | ALIYUN$odpst****@aliyun.com       | 2024-07-26 16:19:28    | 2024-07-26 16:19:28    | ACTIVE       |
| periodic_task3            | ALIYUN$odpst****@aliyun.com       | 2024-07-26 16:20:45    | 2024-07-26 16:20:45    | ACTIVE       |
| periodic_task6            | ALIYUN$odpst****@aliyun.com       | 2024-08-15 11:09:55    | 2024-08-15 11:09:55    | ACTIVE       |
| periodic_task_alter1      | ALIYUN$odpst****@aliyun.com       | 2024-07-26 16:17:40    | 2024-07-26 16:17:40    | ACTIVE       |
+--------------------------------------------------------------------------------------------------------------------------------+

查看定时调度任务(Periodic Task)自动调度实例的历史列表

您可查看定时调度任务(Periodic Task)自动调度实例的InstanceId、CreateTime、EndTime以及Status信息。

说明

若需要查看某个自动调度实例的详细日志,可以执行WAIT <InstanceId>;命令。例如:执行WAIT 2024082309000177gq71ut9****;命令可获取到MaxCompute Logview地址。您可以将Logview地址复制到浏览器中,查看其详细日志。

语法

SHOW HISTORY FOR TASK <task_name>; 

参数说明

task_name:周期调度任务名称。

示例

查看定时调度任务periodic_task1的自动调度历史列表信息。命令如下:

SHOW HISTORY FOR TASK periodic_task1;

返回结果示例如下。

+---------------------------------------------------------------------------------------------------+
| Project:                  xr_project                                                                     |
| Schema:                                                                                                                     |
| Task:                     periodic_task1                                                                     |
+---------------------------------------------------------------------------------------------------+
| History:                                                                                          |
+---------------------------------------------------------------------------------------------------+
| InstanceId                       | CreateTime             | EndTime                | Status       |
+---------------------------------------------------------------------------------------------------+
| 20240823093000475gpp5sfm****     | 2024-08-23 17:30:00    | 2024-08-23 17:30:03    | Terminated   |
| 2024082309000177gq71ut9****      | 2024-08-23 17:00:00    | 2024-08-23 17:00:02    | Terminated   |
| 20240823083001129gm2k2uq****     | 2024-08-23 16:30:00    | 2024-08-23 16:30:02    | Terminated   |
| 2024082308000127g830w1nu****     | 2024-08-23 16:00:00    | 2024-08-23 16:00:02    | Terminated   |
| 20240823073000936gd6nlla****     | 2024-08-23 15:30:00    | 2024-08-23 15:30:02    | Terminated   |
| 20240823070000813g1c1ksw7****    | 2024-08-23 15:00:00    | 2024-08-23 15:00:02    | Terminated   |
| 20240823063001181glltwkz****     | 2024-08-23 14:30:00    | 2024-08-23 14:30:02    | Terminated   |
| 20240823060000776goyxv1nu****    | 2024-08-23 14:00:00    | 2024-08-23 14:00:02    | Terminated   |
| 2024082305300180g5in7zb****      | 2024-08-23 13:30:00    | 2024-08-23 13:30:02    | Terminated   |
+---------------------------------------------------------------------------------------------------+

函数白名单

创建周期调度任务(Periodic Task)时,支持配置一个BOOLEAN表达式。目前,对支持的条件语句进行了相应的约束,仅支持使用以下。函数详情,请参见内建函数(字母排序)

  • DATEADD

  • DATEDIFF

  • DATEPART

  • DATETRUNC

  • DATE_FORMAT

  • FROM_UNIXTIME

  • GETDATE

  • ISDATE

  • LASTDAY

  • LAST_DAY

  • UNIX_TIMESTAMP

  • WEEKDAY

  • WEEKDAY

  • WEEKOFYEAR

  • TO_DATE

  • TO_CHAR

  • YEAR

  • QUARTER

  • MONTH

  • DAY

  • DAYOFMONTH

  • HOUR

  • MINUTE

  • SECOND

  • CURRENT_TIMESTAMP

  • FROM_UTC_TIMESTAMP

  • ADD_MONTHS

  • NEXT_DAY

  • MONTHS_BETWEEN

  • TO_MILLIS

  • ABS

  • ROUND

  • CONCAT

  • CONCAT_WS

  • GET_JSON_OBJECT

  • INSTR

  • LENGTH

  • LENGTHB

  • REGEXP_EXTRACT

  • REGEXP_REPLACE

  • REGEXP_INSTR

  • REGEXP_SUBSTR

  • REGEXP_COUNT

  • REVERSE

  • SUBSTR

  • TOLOWER

  • TOUPPER

  • TRIM

  • LTRIM

  • RTRIM

  • REPLACE

  • SIZE

  • FIELD

  • COALESCE

  • IF

  • SPLIT

  • SPLIT_PART

  • FROM_JSON

  • MAX_PT

  • TABLE_EXISTS

  • PARTITION_EXISTS

  • GET_LATEST_VERSION

  • GET_LATEST_TIMESTAMP