创建定时SQL任务

调用CreateScheduledSQL接口创建定时SQL任务。

前提条件

您已完成以下操作:

背景信息

日志服务提供定时SQL功能,用于定时分析数据、存储聚合数据、投影与过滤数据。定时SQL支持标准SQL92语法、日志服务查询和分析语法,按照调度规则周期性执行,并将运行结果写入到目标库(LogstoreMetricstore)中。

日志服务控制台提供可视化的创建定时SQL任务。具体操作,请参见创建定时SQL任务

除此之外,日志服务提供ScheduledSQL、JobScheduleScheduledSQLConfiguration类,帮助您更简单的通过Java SDK创建定时SQL任务。

  • ScheduledSQL:创建一个定时SQL任务。

  • JobSchedule:创建定时SQL的调度任务。

  • ScheduledSQLConfiguration:创建定时SQL任务的基本配置。

参数说明

请求参数

名称

类型

是否必填

描述

示例值

project

String

Project名称。

ali-test-project

scheduledSql

Object

定时 SQL 任务配置

-

ScheduledSQL

参数说明如下表所示。

参数名称

类型

是否必填

说明

示例

name

String

定时SQL任务名称。其命名规则如下:

  • 同一个Project下,不可重复。

  • 只能包含小写字母、数字、短划线(-)和下划线(_)。

  • 必须以小写字母或者数字开头和结尾。

  • 长度为4~63字符。

export-123-456

displayName

String

定时SQL任务的显示名称。在日志服务控制台,选择任务管理 > 定时SQL,可以查看定时SQL显示名称列表。

my-scheduled-sql-job

description

String

定时SQL任务描述。

this is a scheduled sql job.

configuration

Object

定时SQL配置信息。

-

schedule

Object

任务调度配置。

-

JobSchedule

调用JobSchedule jobSchedule = new JobSchedule();创建定时SQL任务的调度任务。各个参数说明如下表所示。

参数名称

类型

是否必填

说明

示例

type

String

调度定时SQL任务的频率,每调度一次定时SQL任务产生一个执行实例。调度间隔决定每个执行实例的调度时间。

  • FixedRate:按照固定间隔调度定时SQL任务。固定间隔由interval参数指定。

  • Hourly:每小时调度一次定时SQL任务。

  • Daily:在每天的某个固定时间点调度一次定时SQL任务。

  • Cron:通过Cron表达式指定时间间隔,按照指定的时间间隔调度定时SQL任务。

FixedRate

interval

String

type取值为FixedRate时,配置固定间隔。

  • 3s:间隔3秒。

  • 5m:间隔5分钟。

  • 2h:间隔2小时。

50m

cronExpression

String

type取值为Cron时,配置Cron表达式。

Cron表达式的最小精度为分钟,24小时制,例如0 0/1 * * *表示从00:00开始,每隔1小时运行一次。

当您需要配置时区时,需选择Cron模式。常见的时区列表请参见时区列表

runImmediately

boolean

定时任务是否立即执行。

False

timeZone

String

Cron 表达式所在时区,默认为空,表示东八区。

+0800

delay

int

调度时间点往后延迟执行的时间。取整范围:0~120,单位:秒。

当数据写入Logstore存在延迟等情况时,可通过延迟执行来保证数据的完整性。

10

ScheduledSQLConfiguration

调用ScheduledSQLConfiguration scheduledSQLConfiguration = generateConfig();创建定时SQL任务的配置信息任务。各个参数说明如下表所示。

参数名称

类型

是否必填

描述

示例

script

String

输入的查询和分析语句。

*|select count(1)

sqlType

String

SQL类型,固定取值为searchQuery

searchQuery

resourcePool

String

资源池类型,固定取值为enhanced。日志服务提供增强型资源池用于数据分析。

enhanced

roleArn

String

读取源Logstore数据的RAM角色标识。如何获取ARN,请参见授予自定义RAM角色分析源Logstore的权限

acs:ram::11111111:role/aliyunlogetlrole

destRoleArn

String

写入目标Logstore数据的RAM角色标识。如何获取ARN,请参见如下说明:

acs:ram::11111111:role/aliyunlogetlrole

sourceLogstore

String

Logstore名称。

source-logstore

destEndpoint

String

待写入Logstore对应的服务入口。

说明
  • 阿里云产品内部相互通信。例如同地域ECS访问日志服务,建议优先使用私网域名。其私网域名为http://cn-hangzhou-intranet.log.aliyuncs.com

  • 通过公网(互联网)访问日志服务。例如通过本地公网的API、SDK访问日志服务,建议优先使用公网域名。其公网域名为http://cn-hangzhou.log.aliyuncs.com

  • 相比私网域名的计费,公网域名收费增加了外网读流量。更多信息,请参见日志服务计费项

更多信息,请参见服务入口

http://cn-hangzhou-intranet.log.aliyuncs.com

destProject

String

待写入数据的目标Project名称。

my-project

destLogstore

String

待写入数据的目标Logstore名称。

my-logstore

dataFormat

String

写入模式。

  • log2log:日志库导入日志库,即表示源Logstore中的数据通过定时SQL处理后将存储到目标Logstore中。

  • log2metric:日志库导入时序库,即表示源Logstore中的数据通过定时SQL处理后将存储到目标MetricStore中。

  • metric2metric:时序库导入时序库,即表示源MetricStore中的数据通过定时SQL处理后将存储到目标MetricStore中。

log2log

fromTimeExpr

String

SQL时间窗口开始表达式。更多信息,请参见时间表达式语法

@m - 12s

toTimeExpr

String

SQL时间窗口结束表达式。更多信息,请参见时间表达式语法

@m

maxRetries

Long

执行SQL分析操作失败时自动重试的阈值。当重试次数超过最大次数时,该执行实例结束,状态为失败。

10

maxRunTimeInSeconds

Long

执行SQL分析操作失败时自动重试的阈值。当重试时间超过指定的最大时间时,该执行实例结束,状态为失败。

60

fromTime

Long

调度开始时间。

重要

实例的调度时间必须在该范围内,超出该范围时,定时SQL任务不再产生新实例。

1653965045

toTime

Long

调度结束时间,0表示不结束。

1653968045

parameters

Object

dataFormat取值为log2metricmetric2metric时,配置SQL配置参数。具体配置项请参见

Log2MetricParametersMetric2MetricParameters

{
  addLabels: "{}",
  hashLabels: "[]",
  labelKeys: "[\"your label1\",\"your label2\"]",
  metricKeys: "[\"your Indicator1\",\"your Indicator2\"]",
  metricName: "",
  timeKey: ""
}

parameters

  • 配置从LogstoreMetricStore的定时SQL任务时,还需要配置以下额外的参数:

    表 1. Log2MetricParameters

    参数名称

    示例

    说明

    metricKeys

    "[\"a\", \"b\", \"c\"]"

    指标列,对应控制台界面SQL配置的指标列。

    日志服务会根据您输入的查询和分析语句聚合数据,您可以选择查询和分析结果中列值为数值类型的一列或多列作为指标列。更多信息,请参见时序数据(Metric)

    labelKeys

    "[\"d\", \"e\", \"f\"]"

    标签列,对应控制台界面SQL配置的Labels。

    日志服务会根据您输入的查询和分析语句聚合数据,您可以选择查询和分析结果中的一列或多列作为Label数据。更多信息,请参见时序数据(Metric)

    hashLabels

    "[\"d\", \"f\"]"

    对应控制台界面SQL配置的Rehash。

    打开Rehash开关后,您可以配置哈希列,用于将同一列值的数据写入到一个Shard中,增强数据局部性,提升查询效率。

    哈希列的取值取决于查询和分析结果。您可以选择查询和分析结果中的一列或多列作为哈希列。例如您配置哈希列status,则status字段值相同的数据将被写入到同一个Shard中。

    addLabels

    "[\"m\":\"h\", \"n\":\"i\"]"

    对应控制台界面SQL配置的附加Labels。

    添加静态标签,键值对形式,可用于标识指标的相关属性。

    例如配置label_keyapp,配置label_valueingress-nginx

    timeKey

    time

    对应控制台界面SQL配置的时间列。

    • 如果您选择查询和分析结果中的时间列(列值为Unixtime时间戳,例如atime:1627025331),则系统将以该时间列作为时序数据的时间。

    • 如果您选择,则系统将以查询和分析时间范围中的开始时间作为时序数据的时间。

    Log2MetricParameters

  • 配置从MetricStoreMetricStore的定时SQL任务时,还需要配置以下额外的参数:

    表 2. Metric2MetricParameters

    参数名称

    示例

    说明

    metricName

    my-metric

    如果您要修改您所分析的指标名,您可以输入修改后的指标名。更多信息,请参见时序数据(Metric)

    重要

    建议分析的对象为单个指标时,修改指标名,实现重命名。

    如果分析对象为多个指标,则修改指标名后,会将所有的指标名修改为同一个相同的指标名。

    hashLabels

    "{\"m\":\"h\", \"n\":\"i\"}"

    对应控制台界面SQL配置的Rehash。

    打开Rehash开关后,您可以配置哈希列,用于将同一Label值的数据写入到一个Shard中,增强数据局部性,提升查询效率。

    哈希列的取值取决于时序数据已有的Label信息。例如时序数据已有的Label信息为{"alert_id":"alert-1608815762-545495","alert_name":"告警恢复关闭","status":"inactive"},则哈希列的可选值为alert_idalert_namestatus。如果您配置哈希列status,则status字段值相同的数据将被写入到同一个Shard中。

    addLabels

    "{\"m\":\"h\", \"n\":\"i\"}"

    对应控制台界面SQL配置的附加Labels。

    添加静态标签,键值对形式,可用于标识指标的相关属性。

    例如配置label_keyapp,配置label_valueingress-nginx

    Metric2MetricParameters

返回参数

返回参数说明,请参见CreateScheduledSQL - 创建定时SQL任务

示例代码

本示例中,创建一个App.java文件,将源Logstore的定时分析结果存储到目标Logstore。示例如下:

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.CreateScheduledSQLRequest;

public class App {
    // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

    // 设置Project名称和Logstore名称。
    static String sourceProject="aliyun-test-sourceProject";
    static String destProject="aliyun-test-destProject";
    static String sourceLogstore = "logstore-name";
    static String destLogstore = "project-name";
    static String roleArn = "acs:ram::11111111:role/aliyunlogetlrole";
    // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
    static String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
    static String destEndpoint = "http://cn-hangzhou-intranet.log.aliyuncs.com";
    static long fromTime = 1648105200; //2022-03-23 15:00:00
    private static String script = "* | select a,b,c from log";

    private static ScheduledSQLBaseParameters generateParams(String dataFormat) {
        if (dataFormat.equalsIgnoreCase("log2log")) {
            return null;
        } else if (dataFormat.equalsIgnoreCase("log2metric")) {
            Log2MetricParameters params = new Log2MetricParameters();
            params.setMetricKeys("[\"a\", \"b\", \"c\"]");
            params.setLabelKeys("[\"d\", \"e\", \"f\"]");
            params.setHashLabels("[\"d\", \"f\"]");
            params.setAddLabels("{\"m\":\"h\", \"n\":\"i\"}");
            params.setTimeKey("time");
            return params;
        } else if (dataFormat.equalsIgnoreCase("metric2metric")) {
            Metric2MetricParameters params = new Metric2MetricParameters();
            params.setMetricName("name");
            params.setHashLabels("[\"d\", \"f\"]");
            params.setAddLabels("{\"m\":\"h\", \"n\":\"i\"}");
            return params;
        }
        return null;
    }

    private static ScheduledSQLConfiguration generateConfig() {
        ScheduledSQLConfiguration scheduledSQLConfiguration = new ScheduledSQLConfiguration();
        scheduledSQLConfiguration.setScript(script);
        scheduledSQLConfiguration.setSqlType("searchQuery");
        scheduledSQLConfiguration.setResourcePool("enhanced");
        scheduledSQLConfiguration.setRoleArn(roleArn);
        scheduledSQLConfiguration.setDestRoleArn(roleArn);
        scheduledSQLConfiguration.setSourceLogstore(sourceLogstore);
        scheduledSQLConfiguration.setDestEndpoint(destEndpoint);
        scheduledSQLConfiguration.setDestProject(destProject);
        scheduledSQLConfiguration.setDestLogstore(destLogstore);
        scheduledSQLConfiguration.setDataFormat("log2log");
        scheduledSQLConfiguration.setFromTimeExpr("@m-1m");
        scheduledSQLConfiguration.setToTimeExpr("@m");
        scheduledSQLConfiguration.setMaxRetries(20);
        scheduledSQLConfiguration.setMaxRunTimeInSeconds(600);
        scheduledSQLConfiguration.setFromTime(fromTime);
        scheduledSQLConfiguration.setToTime(0L);

        ScheduledSQLBaseParameters params = generateParams(scheduledSQLConfiguration.getDataFormat());
        scheduledSQLConfiguration.setParameters(params);

        return scheduledSQLConfiguration;
    }

    private static ScheduledSQL generateScheduledSQL() {
        ScheduledSQL scheduledSQLStructure = new ScheduledSQL();
        scheduledSQLStructure.setName("job-name");
        scheduledSQLStructure.setDisplayName("display-name");
        scheduledSQLStructure.setDescription("desc-name");
        ScheduledSQLConfiguration scheduledSQLConfiguration = generateConfig();
        scheduledSQLStructure.setConfiguration(scheduledSQLConfiguration);
        JobSchedule jobSchedule = new JobSchedule();
        jobSchedule.setType(JobScheduleType.FIXED_RATE);
        jobSchedule.setInterval("1m");
        jobSchedule.setDelay(10);
        jobSchedule.setRunImmediately(false);
        scheduledSQLStructure.setSchedule(jobSchedule);
        return scheduledSQLStructure;
    }

    public static void main(String[] args) {
        Client client = new Client(endpoint, accessId, accessKey);
        ScheduledSQL scheduledSQL = generateScheduledSQL();
        CreateScheduledSQLRequest request = new CreateScheduledSQLRequest(sourceProject, scheduledSQL);
        try {
            client.createScheduledSQL(request);
        } catch (LogException e) {
            e.printStackTrace();
        }
    }
} 

相关文档