调用CreateScheduledSQL接口创建定时SQL任务。
前提条件
您已完成以下操作:
参数说明
def create_scheduled_sql(self, project_name, scheduled_sql):
请求参数
|
名称 |
类型 |
是否必填 |
描述 |
示例值 |
|
project_name |
String |
是 |
Project名称。 |
ali-test-project |
|
scheduled_sql |
object |
是 |
定时 SQL 任务配置 |
- |
scheduled_sql参数说明:
|
名称 |
类型 |
是否必填 |
描述 |
示例值 |
|
name |
String |
是 |
定时SQL任务名称。其命名规则如下:
|
test-001 |
|
displayName |
String |
是 |
任务显示名。 |
test-001 |
|
description |
String |
否 |
任务描述。 |
创建一个定时SQL任务。 |
|
schedule |
object |
是 |
任务调度配置。 |
- |
|
configuration |
object |
是 |
定时SQL配置信息。 |
- |
schedule参数说明:
|
名称 |
类型 |
是否必填 |
描述 |
示例值 |
|
type |
String |
是 |
定时SQL类型。支持5种枚举值:
|
Cron |
|
cronExpression |
String |
否 |
0/5 * * * * |
|
|
runImmediately |
bool |
否 |
定时任务是否立即执行。 |
False |
|
timeZone |
String |
否 |
Cron表达式所在时区,默认为空,表示东八区。 |
+0800 |
|
delay |
int |
否 |
任务延迟执行(单位是秒:s),不能大于86400。 |
4 |
|
interval |
String |
否 |
当type为 |
1m |
configuration参数说明:
|
名称 |
类型 |
是否必填 |
描述 |
示例值 |
|
script |
String |
是 |
定时SQL分析语句。 |
* | select * |
|
sqlType |
String |
是 |
SQL类型。 |
searchQuery |
|
destEndpoint |
String |
是 |
目标Endpoint。 |
cn-hangzhou-intranet.log.aliyuncs.com |
|
destProject |
String |
是 |
目标Project。 |
project-demo |
|
sourceLogStore |
String |
是 |
源库。 |
source-logstore-demo |
|
destLogStore |
String |
是 |
目标库。 警告 请勿将目标库配置为当前源库(即同源配置),否则可能导致日志被循环写入,产生额外的存储与流量费用。由此造成的资源消耗及相关费用,将由用户自行承担。 |
dest-logstore-demo |
|
roleArn |
String |
是 |
执行SQL授权角色ARN。 |
acs:ram::123456789:role/aliyunlogetlrole |
|
destRoleArn |
String |
是 |
写目标授权角色ARN。 |
acs:ram::123456789:role/aliyunlogetlrole |
|
fromTimeExpr |
String |
是 |
SQL时间窗口开始。 |
@m-1m |
|
toTimeExpr |
String |
是 |
SQL时间窗口结束。 |
@m |
|
maxRunTimeInSeconds |
int |
是 |
SQL超时最长时间,单位:秒,取值范围 60~1800。 |
600 |
|
resourcePool |
String |
是 |
资源池类型(enhanced 表示增强型)。 |
enhanced |
|
maxRetries |
int |
是 |
SQL超时最大次数,取值范围 1~100。 |
20 |
|
fromTime |
int |
是 |
开始时间。更多信息,请参见创建定时SQL-日志库导入时序库。 |
1712592000 |
|
toTime |
int |
是 |
结束时间。更多信息,请参见创建定时SQL-日志库导入时序库。 |
0 |
|
dataFormat |
String |
是 |
支持三种配置: |
log2log |
|
parameters |
object |
是 |
SQL配置。更多信息请参见创建定时SQL-日志库导入时序库。 |
|
返回参数
返回参数说明,请参见创建定时SQL任务。
示例代码
import os
import time
from aliyun.log import LogClient
from aliyun.log.scheduled_sql import *
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '') # The AccessKeyId
accessKeySecret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '') # The AccessKeySecret
endpoint = "cn-hangzhou.log.aliyuncs.com" # The source endpoint of the project's region
roleArn = "acs:ram::141******5616316:role/aliyunserviceroleforslsaudit" # The roleArn
project = "demo-test-project" # The source project name
source_logstore = "test-logstore" # The source logstore name
source_metricstore = "" # The source metricstore name
dest_endpoint = "cn-hangzhou.log.aliyuncs.com" # The endpoint of the destination project's region
dest_role_arn = "acs:ram::141******5616316:role/aliyunserviceroleforslsaudit" # The destination roleArn
dest_project = "demo-test-project" # The destination project name
dest_logstore = "test-logstore2" # The destination logstore name
dest_metricstore = "" # The destination metricstore name
from_time = int(time.time()) - 360 # The start time of the scheduled SQL task
job_name = "test-001" # The job name
display_name = "test-001" # The display name
description = "创建一个定时SQL任务" # The description
script = "* | select *" # The SQL script
promql = "* | select promql_query_range('key{}') from metrics limit 1000" # The PromQL script
instance_id = "" # The job instanceId for schedule sql
delay_seconds = 0 # the delay seconds for schedule sql
# three possible values for the variable data_format : "log2log" , "log2metric" ,"metric2metric"
data_format = "log2log"
# Possible values for the variable schedule_type: "FixedRate", "Daily", "Weekly", "Hourly", "Cron"
# schedule_type = "FixedRate"
schedule_type = "FixedRate"
client = LogClient(endpoint, accessKeyId, accessKeySecret)
def generate_schedule_sql():
schedule_rule = generate_schedule_rule()
config = generate_config()
schedule_sql = ScheduledSQL()
schedule_sql.setName(job_name)
schedule_sql.setDisplayName(display_name)
schedule_sql.setDescription(description)
schedule_sql.setSchedule(schedule_rule)
schedule_sql.setConfiguration(config)
return schedule_sql
def generate_schedule_rule():
schedule_rule = JobSchedule()
schedule_rule.setType(schedule_type)
schedule_rule.setDelay(delay_seconds)
schedule_rule.setTimeZone('+0800')
if schedule_type == "FixedRate":
# The interval value must be a string in the format "Xs", "Xm", "Xh", or "Xd",
# where X is an integer representing the number of seconds, minutes, hours, or days.
schedule_rule.setInterval('60s')
elif schedule_type == "Cron":
# The cron expression must be a valid cron string.
schedule_rule.setCronExpression("*/2 * * * *")
elif schedule_type == "Daily":
# The hour value must be an integer between 0 and 23.
schedule_rule.setHour(1)
elif schedule_type == "Weekly":
# The day of the week value must be an integer between 1 and 7.
# The hour value must be an integer between 0 and 23.
schedule_rule.setDayOfWeek(1)
schedule_rule.setHour(2)
elif schedule_type == "Hourly":
pass
return schedule_rule
def generate_config():
query = script
if data_format == 'log2log':
source = source_logstore
dest = dest_logstore
elif data_format == 'log2metric':
source = source_logstore
dest = dest_metricstore
elif data_format == 'metric2metric':
source = source_metricstore
dest = dest_metricstore
query = promql
parameters = generate_log2log_params()
config = ScheduledSQLConfiguration()
config.setScript(query)
config.setSqlType('searchQuery')
config.setDestEndpoint(dest_endpoint)
config.setDestProject(dest_project)
config.setSourceLogstore(source)
config.setDestLogstore(dest)
config.setRoleArn(roleArn)
config.setDestRoleArn(dest_role_arn)
config.setFromTimeExpr("@m - 5m")
config.setToTimeExpr("@m")
config.setMaxRunTimeInSeconds(1800)
config.setResourcePool("enhanced")
config.setMaxRetries(10)
config.setFromTime(from_time)
config.setToTime(0)
config.setDataFormat(data_format)
config.setParameters(parameters)
return config
def generate_log2log_params():
parameters = ScheduledSQLBaseParameters()
return parameters
if __name__ == '__main__':
schedule_sql = generate_schedule_sql()
res = client.create_scheduled_sql(dest_project, schedule_sql)
res.log_print()
示例返回结果
CreateScheduleSqlResponse:
headers: {'Server': 'AliyunSLS', 'Content-Length': '0', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Fri, 01 Nov 2024 07:04:39 GMT', 'x-log-time': '1730444679', 'x-log-requestid': '67247D87DB04A454D92A6F6B'}
Process finished with exit code 0
