调用CreateScheduledSQL接口创建定时SQL任务。
前提条件
您已完成以下操作:
背景信息
日志服务提供定时SQL功能,用于定时分析数据、存储聚合数据、投影与过滤数据。定时SQL支持标准SQL92语法、日志服务查询和分析语法,按照调度规则周期性执行,并将运行结果写入到目标库(Logstore或Metricstore)中。
日志服务控制台提供可视化的创建定时SQL任务。具体操作,请参见创建定时SQL任务。
除此之外,日志服务提供ScheduledSQL、JobSchedule和ScheduledSQLConfiguration类,帮助您更简单的通过Java SDK创建定时SQL任务。
ScheduledSQL:创建一个定时SQL任务。
JobSchedule:创建定时SQL的调度任务。
ScheduledSQLConfiguration:创建定时SQL任务的基本配置。
参数说明
请求参数
名称 | 类型 | 是否必填 | 描述 | 示例值 |
project | String | 是 | Project名称。 | ali-test-project |
scheduledSql | Object | 是 | 定时 SQL 任务配置 | - |
ScheduledSQL
参数说明如下表所示。
参数名称 | 类型 | 是否必填 | 说明 | 示例 |
name | String | 是 | 定时SQL任务名称。其命名规则如下:
| export-123-456 |
displayName | String | 是 | 定时SQL任务的显示名称。在日志服务控制台,选择 ,可以查看定时SQL显示名称列表。 | my-scheduled-sql-job |
description | String | 否 | 定时SQL任务描述。 | this is a scheduled sql job. |
configuration | Object | 是 | 定时SQL配置信息。 | - |
schedule | Object | 是 | 任务调度配置。 | - |
JobSchedule
调用JobSchedule jobSchedule = new JobSchedule();
创建定时SQL任务的调度任务。各个参数说明如下表所示。
参数名称 | 类型 | 是否必填 | 说明 | 示例 |
type | String | 是 | 调度定时SQL任务的频率,每调度一次定时SQL任务产生一个执行实例。调度间隔决定每个执行实例的调度时间。
| FixedRate |
interval | String | 否 | 当type取值为FixedRate时,配置固定间隔。
| 50m |
cronExpression | String | 否 | 当type取值为Cron时,配置Cron表达式。 Cron表达式的最小精度为分钟,24小时制,例如 当您需要配置时区时,需选择Cron模式。常见的时区列表请参见时区列表。 | 无 |
runImmediately | boolean | 否 | 定时任务是否立即执行。 | False |
timeZone | String | 否 | Cron 表达式所在时区,默认为空,表示东八区。 | +0800 |
delay | int | 否 | 调度时间点往后延迟执行的时间。取整范围:0~120,单位:秒。 当数据写入Logstore存在延迟等情况时,可通过延迟执行来保证数据的完整性。 | 10 |
ScheduledSQLConfiguration
调用ScheduledSQLConfiguration scheduledSQLConfiguration = generateConfig();
创建定时SQL任务的配置信息任务。各个参数说明如下表所示。
参数名称 | 类型 | 是否必填 | 描述 | 示例 |
script | String | 是 | 输入的查询和分析语句。 | *|select count(1) |
sqlType | String | 是 | SQL类型,固定取值为searchQuery。 | searchQuery |
resourcePool | String | 是 | 资源池类型,固定取值为enhanced。日志服务提供增强型资源池用于数据分析。 | enhanced |
roleArn | String | 是 | 读取源Logstore数据的RAM角色标识。如何获取ARN,请参见授予自定义RAM角色分析源Logstore的权限。 |
|
destRoleArn | String | 是 | 写入目标Logstore数据的RAM角色标识。如何获取ARN,请参见如下说明:
|
|
sourceLogstore | String | 是 | 源Logstore名称。 | source-logstore |
destEndpoint | String | 是 | 待写入Logstore对应的服务入口。 说明
更多信息,请参见服务入口。 |
|
destProject | String | 是 | 待写入数据的目标Project名称。 | my-project |
destLogstore | String | 是 | 待写入数据的目标Logstore名称。 | my-logstore |
dataFormat | String | 是 | 写入模式。
| log2log |
fromTimeExpr | String | 是 | SQL时间窗口开始表达式。更多信息,请参见时间表达式语法。 | @m - 12s |
toTimeExpr | String | 是 | SQL时间窗口结束表达式。更多信息,请参见时间表达式语法。 | @m |
maxRetries | Long | 是 | 执行SQL分析操作失败时自动重试的阈值。当重试次数超过最大次数时,该执行实例结束,状态为失败。 | 10 |
maxRunTimeInSeconds | Long | 是 | 执行SQL分析操作失败时自动重试的阈值。当重试时间超过指定的最大时间时,该执行实例结束,状态为失败。 | 60 |
fromTime | Long | 是 | 调度开始时间。 重要 实例的调度时间必须在该范围内,超出该范围时,定时SQL任务不再产生新实例。 | 1653965045 |
toTime | Long | 是 | 调度结束时间,0表示不结束。 | 1653968045 |
parameters | Object | 是 | 当dataFormat取值为log2metric或metric2metric时,配置SQL配置参数。具体配置项请参见 |
|
parameters
配置从Logstore到MetricStore的定时SQL任务时,还需要配置以下额外的参数:
表 1. Log2MetricParameters
参数名称
示例
说明
metricKeys
"[\"a\", \"b\", \"c\"]"
指标列,对应控制台界面SQL配置的指标列。
日志服务会根据您输入的查询和分析语句聚合数据,您可以选择查询和分析结果中列值为数值类型的一列或多列作为指标列。更多信息,请参见时序数据(Metric)。
labelKeys
"[\"d\", \"e\", \"f\"]"
标签列,对应控制台界面SQL配置的Labels。
日志服务会根据您输入的查询和分析语句聚合数据,您可以选择查询和分析结果中的一列或多列作为Label数据。更多信息,请参见时序数据(Metric)。
hashLabels
"[\"d\", \"f\"]"
对应控制台界面SQL配置的Rehash。
打开Rehash开关后,您可以配置哈希列,用于将同一列值的数据写入到一个Shard中,增强数据局部性,提升查询效率。
哈希列的取值取决于查询和分析结果。您可以选择查询和分析结果中的一列或多列作为哈希列。例如您配置哈希列为status,则status字段值相同的数据将被写入到同一个Shard中。
addLabels
"[\"m\":\"h\", \"n\":\"i\"]"
对应控制台界面SQL配置的附加Labels。
添加静态标签,键值对形式,可用于标识指标的相关属性。
例如配置label_key为app,配置label_value为ingress-nginx。
timeKey
time
对应控制台界面SQL配置的时间列。
如果您选择查询和分析结果中的时间列(列值为Unixtime时间戳,例如
atime:1627025331
),则系统将以该时间列作为时序数据的时间。如果您选择空,则系统将以查询和分析时间范围中的开始时间作为时序数据的时间。
配置从MetricStore到MetricStore的定时SQL任务时,还需要配置以下额外的参数:
表 2. Metric2MetricParameters
参数名称
示例
说明
metricName
my-metric
如果您要修改您所分析的指标名,您可以输入修改后的指标名。更多信息,请参见时序数据(Metric)。
重要建议分析的对象为单个指标时,修改指标名,实现重命名。
如果分析对象为多个指标,则修改指标名后,会将所有的指标名修改为同一个相同的指标名。
hashLabels
"{\"m\":\"h\", \"n\":\"i\"}"
对应控制台界面SQL配置的Rehash。
打开Rehash开关后,您可以配置哈希列,用于将同一Label值的数据写入到一个Shard中,增强数据局部性,提升查询效率。
哈希列的取值取决于时序数据已有的Label信息。例如时序数据已有的Label信息为
{"alert_id":"alert-1608815762-545495","alert_name":"告警恢复关闭","status":"inactive"}
,则哈希列的可选值为alert_id、alert_name、status。如果您配置哈希列为status,则status字段值相同的数据将被写入到同一个Shard中。addLabels
"{\"m\":\"h\", \"n\":\"i\"}"
对应控制台界面SQL配置的附加Labels。
添加静态标签,键值对形式,可用于标识指标的相关属性。
例如配置label_key为app,配置label_value为ingress-nginx。
返回参数
返回参数说明,请参见CreateScheduledSQL - 创建定时SQL任务。
示例代码
本示例中,创建一个App.java文件,将源Logstore的定时分析结果存储到目标Logstore。示例如下:
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.CreateScheduledSQLRequest;
public class App {
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 设置Project名称和Logstore名称。
static String sourceProject="aliyun-test-sourceProject";
static String destProject="aliyun-test-destProject";
static String sourceLogstore = "logstore-name";
static String destLogstore = "project-name";
static String roleArn = "acs:ram::11111111:role/aliyunlogetlrole";
// 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
static String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
static String destEndpoint = "http://cn-hangzhou-intranet.log.aliyuncs.com";
static long fromTime = 1648105200; //2022-03-23 15:00:00
private static String script = "* | select a,b,c from log";
private static ScheduledSQLBaseParameters generateParams(String dataFormat) {
if (dataFormat.equalsIgnoreCase("log2log")) {
return null;
} else if (dataFormat.equalsIgnoreCase("log2metric")) {
Log2MetricParameters params = new Log2MetricParameters();
params.setMetricKeys("[\"a\", \"b\", \"c\"]");
params.setLabelKeys("[\"d\", \"e\", \"f\"]");
params.setHashLabels("[\"d\", \"f\"]");
params.setAddLabels("{\"m\":\"h\", \"n\":\"i\"}");
params.setTimeKey("time");
return params;
} else if (dataFormat.equalsIgnoreCase("metric2metric")) {
Metric2MetricParameters params = new Metric2MetricParameters();
params.setMetricName("name");
params.setHashLabels("[\"d\", \"f\"]");
params.setAddLabels("{\"m\":\"h\", \"n\":\"i\"}");
return params;
}
return null;
}
private static ScheduledSQLConfiguration generateConfig() {
ScheduledSQLConfiguration scheduledSQLConfiguration = new ScheduledSQLConfiguration();
scheduledSQLConfiguration.setScript(script);
scheduledSQLConfiguration.setSqlType("searchQuery");
scheduledSQLConfiguration.setResourcePool("enhanced");
scheduledSQLConfiguration.setRoleArn(roleArn);
scheduledSQLConfiguration.setDestRoleArn(roleArn);
scheduledSQLConfiguration.setSourceLogstore(sourceLogstore);
scheduledSQLConfiguration.setDestEndpoint(destEndpoint);
scheduledSQLConfiguration.setDestProject(destProject);
scheduledSQLConfiguration.setDestLogstore(destLogstore);
scheduledSQLConfiguration.setDataFormat("log2log");
scheduledSQLConfiguration.setFromTimeExpr("@m-1m");
scheduledSQLConfiguration.setToTimeExpr("@m");
scheduledSQLConfiguration.setMaxRetries(20);
scheduledSQLConfiguration.setMaxRunTimeInSeconds(600);
scheduledSQLConfiguration.setFromTime(fromTime);
scheduledSQLConfiguration.setToTime(0L);
ScheduledSQLBaseParameters params = generateParams(scheduledSQLConfiguration.getDataFormat());
scheduledSQLConfiguration.setParameters(params);
return scheduledSQLConfiguration;
}
private static ScheduledSQL generateScheduledSQL() {
ScheduledSQL scheduledSQLStructure = new ScheduledSQL();
scheduledSQLStructure.setName("job-name");
scheduledSQLStructure.setDisplayName("display-name");
scheduledSQLStructure.setDescription("desc-name");
ScheduledSQLConfiguration scheduledSQLConfiguration = generateConfig();
scheduledSQLStructure.setConfiguration(scheduledSQLConfiguration);
JobSchedule jobSchedule = new JobSchedule();
jobSchedule.setType(JobScheduleType.FIXED_RATE);
jobSchedule.setInterval("1m");
jobSchedule.setDelay(10);
jobSchedule.setRunImmediately(false);
scheduledSQLStructure.setSchedule(jobSchedule);
return scheduledSQLStructure;
}
public static void main(String[] args) {
Client client = new Client(endpoint, accessId, accessKey);
ScheduledSQL scheduledSQL = generateScheduledSQL();
CreateScheduledSQLRequest request = new CreateScheduledSQLRequest(sourceProject, scheduledSQL);
try {
client.createScheduledSQL(request);
} catch (LogException e) {
e.printStackTrace();
}
}
}