创建定时SQL任务

更新时间:2025-01-16 03:20:25

调用CreateScheduledSQL接口创建定时SQL任务。

前提条件

您已完成以下操作:

参数说明

def create_scheduled_sql(self, project_name, scheduled_sql):

请求参数

名称

类型

是否必填

描述

示例值

名称

类型

是否必填

描述

示例值

project_name

String

Project名称。

ali-test-project

scheduled_sql

object

定时 SQL 任务配置

-

scheduled_sql参数说明:

名称

类型

是否必填

描述

示例值

名称

类型

是否必填

描述

示例值

name

String

定时SQL任务名称。其命名规则如下:

  • 同一个Project下,不可重复。

  • 只能包含小写字母、数字、短划线(-)和下划线(_)。

  • 必须以小写字母或者数字开头和结尾。

  • 长度为4~63字符。

test-001

displayName

String

任务显示名。

test-001

description

String

任务描述。

创建一个定时SQL任务。

schedule

object

任务调度配置。

-

configuration

object

定时SQL配置信息。

-

schedule参数说明:

名称

类型

是否必填

描述

示例值

名称

类型

是否必填

描述

示例值

type

String

定时SQL类型。支持5种枚举值:

  • Cron:根据cronExpression参数指定的Cron表达式执行调度。

  • Hourly:每小时执行一次,整点执行。

  • Daily:根据hour参数指定的小时,每天执行一次。

  • Weekly:根据dayOfWeekhour参数指定的日期和小时,每周执行一次。

  • FixedRate:每隔一段时间执行一次,通过interval参数指定执行间隔。

Cron

cronExpression

String

Cron表达式

0/5 * * * *

runImmediately

bool

定时任务是否立即执行。

False

timeZone

String

Cron表达式所在时区,默认为空,表示东八区。

+0800

delay

int

任务延迟执行(单位是秒:s),不能大于86400。

4

interval

String

typeFixRate时,interval参数表示执行的间隔,不能大于30天。支持 d(天)、h(小时)、m(分钟)、s(秒)四种单位。例如3h。不支持多种单位组合,例如不支持3h5m。

1m

configuration参数说明:

名称

类型

是否必填

描述

示例值

名称

类型

是否必填

描述

示例值

script

String

定时SQL分析语句。

* | select *

sqlType

String

SQL类型。

searchQuery

destEndpoint

String

目标Endpoint。

cn-hangzhou-intranet.log.aliyuncs.com

destProject

String

目标Project。

project-demo

sourceLogstore

String

源库。

source-logstore-demo

destLogstore

String

目标库。

dest-logstore-demo

roleArn

String

执行SQL授权角色ARN。

acs:ram::123456789:role/aliyunlogetlrole

destRoleArn

String

写目标授权角色ARN。

acs:ram::123456789:role/aliyunlogetlrole

fromTimeExpr

String

SQL时间窗口开始。

@m-1m

toTimeExpr

String

SQL时间窗口结束。

@m

maxRunTimeInSeconds

int

SQL超时最长时间,单位:秒,取值范围 60~1800。

600

resourcePool

String

资源池类型(enhanced 表示增强型)。

enhanced

maxRetries

int

SQL超时最大次数,取值范围 1~100。

20

fromTime

int

开始时间。更多信息,请参见创建定时SQL-日志库导入时序库

1712592000

toTime

int

结束时间。更多信息,请参见创建定时SQL-日志库导入时序库

0

dataFormat

String

支持三种配置:log2loglog2metricmetric2metric

log2log

parameters

object

SQL配置。更多信息请参见创建定时SQL-日志库导入时序库

{
 addLabels: "{}",
 hashLabels: "[]",
 labelKeys: "[\"your label1\",\"your label2\"]",
 metricKeys: "[\"your Indicator1\",\"your Indicator2\"]",
 metricName: "",
 timeKey: ""
}

返回参数

返回参数说明,请参见CreateScheduledSQL - 创建定时SQL任务

示例代码

import os
import time
from aliyun.log import LogClient
from aliyun.log.scheduled_sql import *

accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')  # The AccessKeyId
accessKeySecret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')  # The AccessKeySecret

endpoint = "cn-hangzhou.log.aliyuncs.com"  # The  source endpoint of the project's region
roleArn = "acs:ram::141******5616316:role/aliyunserviceroleforslsaudit"  # The roleArn
project = "demo-test-project"  # The source project name
source_logstore = "test-logstore"  # The source logstore name
source_metricstore = ""  # The source metricstore name

dest_endpoint = "cn-hangzhou.log.aliyuncs.com"  # The endpoint of the destination project's region
dest_role_arn = "acs:ram::141******5616316:role/aliyunserviceroleforslsaudit"  # The destination roleArn
dest_project = "demo-test-project"  # The destination project name
dest_logstore = "test-logstore2"  # The destination logstore name
dest_metricstore = ""  # The destination metricstore name

from_time = int(time.time()) - 360  # The start time of the scheduled SQL task

job_name = "test-001"  # The job name
display_name = "test-001"  # The display name
description = "创建一个定时SQL任务"  # The description

script = "* | select *"  # The SQL script
promql = "* | select promql_query_range('key{}') from metrics limit 1000"  # The PromQL script

instance_id = ""  # The job instanceId for schedule sql
delay_seconds = 0  # the delay seconds for schedule sql

# three possible values for the variable data_format  : "log2log" , "log2metric" ,"metric2metric"
data_format = "log2log"

# Possible values for the variable schedule_type: "FixedRate", "Daily", "Weekly", "Hourly", "Cron"
# schedule_type = "FixedRate"
schedule_type = "FixedRate"

client = LogClient(endpoint, accessKeyId, accessKeySecret)


def generate_schedule_sql():
    schedule_rule = generate_schedule_rule()
    config = generate_config()

    schedule_sql = ScheduledSQL()
    schedule_sql.setName(job_name)
    schedule_sql.setDisplayName(display_name)
    schedule_sql.setDescription(description)
    schedule_sql.setSchedule(schedule_rule)
    schedule_sql.setConfiguration(config)
    return schedule_sql


def generate_schedule_rule():
    schedule_rule = JobSchedule()
    schedule_rule.setType(schedule_type)
    schedule_rule.setDelay(delay_seconds)
    schedule_rule.setTimeZone('+0800')
    if schedule_type == "FixedRate":
        # The interval value must be a string in the format "Xs", "Xm", "Xh", or "Xd",
        # where X is an integer representing the number of seconds, minutes, hours, or days.
        schedule_rule.setInterval('60s')
    elif schedule_type == "Cron":
        # The cron expression must be a valid cron string.
        schedule_rule.setCronExpression("*/2 * * * *")
    elif schedule_type == "Daily":
        # The hour value must be an integer between 0 and 23.
        schedule_rule.setHour(1)
    elif schedule_type == "Weekly":
        # The day of the week value must be an integer between 1 and 7.
        # The hour value must be an integer between 0 and 23.
        schedule_rule.setDayOfWeek(1)
        schedule_rule.setHour(2)
    elif schedule_type == "Hourly":
        pass

    return schedule_rule


def generate_config():
    query = script
    if data_format == 'log2log':
        source = source_logstore
        dest = dest_logstore
    elif data_format == 'log2metric':
        source = source_logstore
        dest = dest_metricstore
    elif data_format == 'metric2metric':
        source = source_metricstore
        dest = dest_metricstore
        query = promql

    parameters = generate_log2log_params()
    config = ScheduledSQLConfiguration()
    config.setScript(query)
    config.setSqlType('searchQuery')
    config.setDestEndpoint(dest_endpoint)
    config.setDestProject(dest_project)
    config.setSourceLogstore(source)
    config.setDestLogstore(dest)
    config.setRoleArn(roleArn)
    config.setDestRoleArn(dest_role_arn)
    config.setFromTimeExpr("@m - 5m")
    config.setToTimeExpr("@m")
    config.setMaxRunTimeInSeconds(1800)
    config.setResourcePool("enhanced")
    config.setMaxRetries(10)
    config.setFromTime(from_time)
    config.setToTime(0)
    config.setDataFormat(data_format)
    config.setParameters(parameters)
    return config


def generate_log2log_params():
    parameters = ScheduledSQLBaseParameters()
    return parameters


if __name__ == '__main__':
    schedule_sql = generate_schedule_sql()
    res = client.create_scheduled_sql(dest_project, schedule_sql)
    res.log_print()

示例返回结果

CreateScheduleSqlResponse:
headers: {'Server': 'AliyunSLS', 'Content-Length': '0', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Date': 'Fri, 01 Nov 2024 07:04:39 GMT', 'x-log-time': '1730444679', 'x-log-requestid': '67247D87DB04A454D92A6F6B'}

Process finished with exit code 0

相关文档

  • 本页导读 (1)
  • 前提条件
  • 参数说明
  • 请求参数
  • 返回参数
  • 示例代码
  • 示例返回结果
  • 相关文档