MaxCompute周期调度任务(Periodic Task)功能,支持通过SQL语句自定义调度和计算逻辑,从而高效灵活地管理定时调度任务,实现更短的周期、高效的运行及低成本的调度管理。
功能介绍
MaxCompute周期调度任务(Periodic Task)可以灵活自定义任务调度策略,执行对应的SQL计算逻辑,自动构建数据管道,简化ETL链路。此外,您还可以将任务与表数据变更和流读场景相结合,从而实现连续的ETL工作流程,以处理最近更改的数据。该功能具有以下特性:
支持SQL语言:所有针对定时任务的管理操作均可通过简洁的SQL语言实现。
支持全生命周期管理:包括创建、修改、删除、查看以及查看任务列表和任务执行实例列表。
秒级别周期调度任务:支持最短10秒的定时调度。
MaxCompute内置调度服务,更高效成本更低。
支持配置实例启动条件的When表达式,可实现较为复杂的条件判断。
支持与平台内对象的无缝结合。例如:支持Delta Table CDC、Object Table等需要定期刷新的对象,提供高效的内置调度服务。
默认支持增量CDC数据后台自动生成。
该功能目前处于邀测中,邀测功能使用方法请参见使用说明。
注意事项
创建或删除周期调度任务前,您需要在Session级别设置SET odps.sql.periodic.task.enabled=true;
参数,打开Task自动执行增量更新开关。否则无法创建或删除周期调度任务。
创建周期调度任务(Periodic Task)
语法
CREATE TASK [IF NOT EXISTS] <task_name>
SCHEDULE = "<num> [minute|minutes] | [second|seconds]"
[TASKPROPERTIES("schedule_strategy"="3",--指定定时调度任务重试次数
"odps.namespace.schema"="true" ....];
[COMMENT task_comment]
[WHEN <bool_expr>]
AS <taskBody>;
参数说明
配置项 | 是否必填 | 描述 |
IF NOT EXISTS | 否 | 创建任务时:
|
task_name | 是 | 周期调度任务名称。 |
SCHEDULE = "<num> [minute|minutes] | [second|seconds]" | 是 | 执行周期时长。支持的单位为分钟、秒,取值范围为[10秒, 59*60秒]。 说明 单位为秒的数值限制为10~59,大于60秒时,需要将单位设置为分钟。 |
| 否 | 定时任务调度执行SQL时需要指定的参数,均可通过此处配置。常见取值为
|
| 否 | 周期调度任务的注释内容。注释内容为长度不超过1024字节的有效字符串,否则报错。 |
WHEN | 否 | 支持配置一个BOOLEAN表达式。仅当该表达式返回TRUE时,后续SQL的执行才会被触发。如果未进行配置,则默认为TRUE。目前对支持的条件语句进行了相应的约束,只有部分函数支持使用。 说明
|
| 是 | 要执行的SQL任务。SQL语句不支持脚本模式执行的语句,目前支持常用的DML和DQL语法,不支持DDL。 |
示例
创建调度任务periodic_task1
,每5分钟(minutes)调度一次,调度失败会重试3次。仅当判断条件stream_has_data('delta_table_stream')
返回结果为TRUE时,执行SQL任务INSERT INTO acid2_table_dest SELECT pk, val FROM acid_stream
。
创建一个Delta Table增量表。
CREATE TABLE acid2_table_dest ( pk BIGINT NOT NULL PRIMARY KEY, val BIGINT ) tblproperties ("transactional"="true");
创建Delta Table源表,再创建一个Stream关联Delta Table表。
-- 创建Delta Table源表 CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true"); -- 创建一个Stream关联Delta Table表 CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties('read_mode'='append') comment 'stream demo';
创建一个定时调度任务periodic_task1。
--开启Task自动执行增量更新 SET odps.sql.periodic.task.enabled=true; CREATE TASK IF NOT EXISTS periodic_task1 schedule = '5 minutes' TASKPROPERTIES('schedule_strategy'='3') comment 'task_comment' WHEN stream_has_data('delta_table_stream') AS INSERT INTO acid2_table_dest SELECT pk, val FROM delta_table_stream;
说明其中
stream_has_data('delta_table_stream')
的条件判断,等同于在MaxCompute中执行SELECT stream_has_data('delta_table_stream');
命令。插入数据至Delta Table源表,并等待调度结束,查看目标表数据。
-- 插入数据至Delta Table源表 INSERT INTO delta_table_src VALUES (1, 1), (2, 2); -- 等待调度结束, 查看目标表数据 SELECT * FROM acid2_table_dest;
返回结果如下:数据已成功写入acid2_table_dest表。
--查到两条结果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+
修改定时调度任务(Periodic Task)
语法
修改定时调度任务的调度时间(执行周期时长)、SQL参数(例如任务重试次数等)及注释内容
ALTER TASK [IF EXISTS] <task_name> SET SCHEDULE = "<num> [minute|minutes] | [second|seconds]" [TASKPROPERTIES("schedule_strategy"="3",--指定定时调度任务重试次数 "odps.namespace.schema"="true" ....]; [COMMENT task_comment];
重启(resume) 或暂停(suspend)周期调度任务
说明创建定时调度任务后会自动启动,若定时调度任务出现问题或手动执行了暂停(suspend)操作,导致任务暂停,您可以通过重启(resume)操作,重新启动定时调度任务。重启后通过
DESC
查看定时调度任务的状态是ACTIVE。ALTER TASK [IF EXISTS] task_name resume | suspend;
修改周期调度任务的触发条件判断
WHEN
表达式,创建Task后可通过该语法修改WHEN表达式ALTER TASK [IF EXISTS] task_name MODIFY WHEN <boolean_expr>;
参数说明
配置项 | 是否必选 | 描述 |
IF EXISTS | 否 | 修改定时调度任务时:
|
boolean_expr | 是 | 周期调度任务的触发条件判断 |
其他更多相关参数说明,请参见参数说明。
示例
示例1:重启周期调度任务
ALTER TASK periodic_task1 resume;
示例2:暂停周期调度任务
ALTER TASK periodic_task1 suspend;
示例3:修改调度时间周期为6分钟
ALTER TASK IF EXISTS periodic_task1 SET schedule = '6 minute';
示例4:添加注释内容
ALTER TASK IF EXISTS periodic_task1 SET comment 'comment2';
示例5:指定周期调度任务重试次数为5次
ALTER TASK IF EXISTS periodic_task1 SET taskproperties('schedule_strategy'='5');
示例6:修改任务触发条件判断
WHEN
表达式ALTER TASK IF EXISTS periodic_task1 MODIFY WHEN 1=1;
查看定时调度任务(Periodic Task)信息
语法
查看定时调度任务信息
DESC task <task_name>;
查看定时调度任务信息以及Extended信息
DESC extended task <task_name>;
参数说明
task_name:周期调度任务名称。
示例
查看定时调度任务periodic_task1信息。
DESC task periodic_task1;
返回结果示例如下:
+------------------------------------------------------------------------------------+
| Owner: ALIYUN$odps****@aliyun.com |
| Project: sql_odps2 |
| Schema: |
| Task: periodic_task1 |
+------------------------------------------------------------------------------------+
| CreateTime: 2024-08-23 11:05:46 |
| LastModifiedTime: 2024-08-23 11:05:46 |
+------------------------------------------------------------------------------------+
| ScheduleText: '5 minute' |
| TaskType: SQL |
| Query: insert into acid2_table_dest select pk, val from acid_stream; |
| Condition: stream_has_data('acid_stream') |
| Status: ACTIVE |
| Comment: |
+------------------------------------------------------------------------------------+
删除定时调度任务(Periodic Task)
语法
DROP TASK [IF EXISTS] <task_name>;
参数说明
配置项 | 是否必填 | 描述 |
IF EXISTS | 否 | 删除定时调度任务时:
|
task_name | 是 | 需要删除的定时调度任务(Periodic Task)名称。 |
示例
删除periodic_task1任务,命令如下:
DROP TASK periodic_task1;
查看所有定时调度任务(Periodic Task)列表
您可以通过以下SQL,查看当前MaxCompute项目下的所有定时调度(Periodic Task)任务。
语法
SHOW TASKS;
示例
查看已创建的定时调度任务。
SHOW TASKS;
返回结果示例如下:
+--------------------------------------------------------------------------------------------------------------------------------+
| Project: openmronlot_daily_arm_src3_xr |
| Schema: |
+--------------------------------------------------------------------------------------------------------------------------------+
| Tasks: |
+--------------------------------------------------------------------------------------------------------------------------------+
| Name | Owner | CreateTime | LastModifiedTime | Status |
+--------------------------------------------------------------------------------------------------------------------------------+
| periodic_task1 | ALIYUN$odpst****@aliyun.com | 2024-07-26 16:19:06 | 2024-07-26 16:19:06 | ACTIVE |
| periodic_task2 | ALIYUN$odpst****@aliyun.com | 2024-07-26 16:19:28 | 2024-07-26 16:19:28 | ACTIVE |
| periodic_task3 | ALIYUN$odpst****@aliyun.com | 2024-07-26 16:20:45 | 2024-07-26 16:20:45 | ACTIVE |
| periodic_task6 | ALIYUN$odpst****@aliyun.com | 2024-08-15 11:09:55 | 2024-08-15 11:09:55 | ACTIVE |
| periodic_task_alter1 | ALIYUN$odpst****@aliyun.com | 2024-07-26 16:17:40 | 2024-07-26 16:17:40 | ACTIVE |
+--------------------------------------------------------------------------------------------------------------------------------+
查看定时调度任务(Periodic Task)自动调度实例的历史列表
您可查看定时调度任务(Periodic Task)自动调度实例的InstanceId、CreateTime、EndTime以及Status信息。
若需要查看某个自动调度实例的详细日志,可以执行WAIT <InstanceId>;
命令。例如:执行WAIT 2024082309000177gq71ut9****;
命令可获取到MaxCompute Logview地址。您可以将Logview地址复制到浏览器中,查看其详细日志。
语法
SHOW HISTORY FOR TASK <task_name>;
参数说明
task_name:周期调度任务名称。
示例
查看定时调度任务periodic_task1的自动调度历史列表信息。命令如下:
SHOW HISTORY FOR TASK periodic_task1;
返回结果示例如下。
+---------------------------------------------------------------------------------------------------+
| Project: xr_project |
| Schema: |
| Task: periodic_task1 |
+---------------------------------------------------------------------------------------------------+
| History: |
+---------------------------------------------------------------------------------------------------+
| InstanceId | CreateTime | EndTime | Status |
+---------------------------------------------------------------------------------------------------+
| 20240823093000475gpp5sfm**** | 2024-08-23 17:30:00 | 2024-08-23 17:30:03 | Terminated |
| 2024082309000177gq71ut9**** | 2024-08-23 17:00:00 | 2024-08-23 17:00:02 | Terminated |
| 20240823083001129gm2k2uq**** | 2024-08-23 16:30:00 | 2024-08-23 16:30:02 | Terminated |
| 2024082308000127g830w1nu**** | 2024-08-23 16:00:00 | 2024-08-23 16:00:02 | Terminated |
| 20240823073000936gd6nlla**** | 2024-08-23 15:30:00 | 2024-08-23 15:30:02 | Terminated |
| 20240823070000813g1c1ksw7**** | 2024-08-23 15:00:00 | 2024-08-23 15:00:02 | Terminated |
| 20240823063001181glltwkz**** | 2024-08-23 14:30:00 | 2024-08-23 14:30:02 | Terminated |
| 20240823060000776goyxv1nu**** | 2024-08-23 14:00:00 | 2024-08-23 14:00:02 | Terminated |
| 2024082305300180g5in7zb**** | 2024-08-23 13:30:00 | 2024-08-23 13:30:02 | Terminated |
+---------------------------------------------------------------------------------------------------+
函数白名单
创建周期调度任务(Periodic Task)时,支持配置一个BOOLEAN表达式。目前,对支持的条件语句进行了相应的约束,仅支持使用以下。函数详情,请参见内建函数(字母排序)。
DATEADD
DATEDIFF
DATEPART
DATETRUNC
DATE_FORMAT
FROM_UNIXTIME
GETDATE
ISDATE
LASTDAY
LAST_DAY
UNIX_TIMESTAMP
WEEKDAY
WEEKDAY
WEEKOFYEAR
TO_DATE
TO_CHAR
YEAR
QUARTER
MONTH
DAY
DAYOFMONTH
HOUR
MINUTE
SECOND
CURRENT_TIMESTAMP
FROM_UTC_TIMESTAMP
ADD_MONTHS
NEXT_DAY
MONTHS_BETWEEN
TO_MILLIS
ABS
ROUND
CONCAT
CONCAT_WS
GET_JSON_OBJECT
INSTR
LENGTH
LENGTHB
REGEXP_EXTRACT
REGEXP_REPLACE
REGEXP_INSTR
REGEXP_SUBSTR
REGEXP_COUNT
REVERSE
SUBSTR
TOLOWER
TOUPPER
TRIM
LTRIM
RTRIM
REPLACE
SIZE
FIELD
COALESCE
IF
SPLIT
SPLIT_PART
FROM_JSON
MAX_PT
TABLE_EXISTS
PARTITION_EXISTS
GET_LATEST_VERSION
GET_LATEST_TIMESTAMP