本文介绍通过Java SDK创建定时SQL任务的代码示例。
前提条件
已安装0.6.69及以上版本的Java SDK。更多信息,请参见安装Java SDK。背景信息
日志服务提供定时SQL功能,用于定时分析数据、存储聚合数据、投影与过滤数据。定时SQL支持标准SQL92语法、日志服务查询和分析语法,按照调度规则周期性执行,并将运行结果写入到目标库(Logstore或Metricstore)中。
日志服务控制台提供可视化的创建定时SQL任务。具体操作,请参见创建定时SQL任务。
- ScheduledSQL:创建一个定时SQL任务。
- JobSchedule:创建定时SQL的调度任务。
- ScheduledSQLConfiguration:创建定时SQL任务的基本配置。
示例代码
本示例中,创建一个App.java文件,将源Logstore的定时分析结果存储到目标Logstore。示例如下:
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.CreateScheduledSQLRequest;
public class App {
//阿里云访问密钥AccessKey。更多信息,请参见访问密钥。阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维。
static String accessId = "your_access_id";
static String accessKey = "your_access_key";
//Project名称和Logstore名称。
static String sourceProject="aliyun-test-sourceProject";
static String destProject="aliyun-test-destProject";
static String sourceLogstore = "logstore-name";
static String destLogstore = "project-name";
static String roleArn = "acs:ram::11111111:role/aliyunlogetlrole";
//日志服务的服务入口。更多信息,请参见服务入口。此处以杭州为例,其它地域请根据实际情况填写。
static String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
static String destEndpoint = "http://cn-hangzhou-intranet.log.aliyuncs.com";
static long fromTime = 1648105200; //2022-03-23 15:00:00
private static String script = "* | select a,b,c from log";
private static ScheduledSQLBaseParameters generateParams(String dataFormat) {
if (dataFormat.equalsIgnoreCase("log2log")) {
return null;
} else if (dataFormat.equalsIgnoreCase("log2metric")) {
Log2MetricParameters params = new Log2MetricParameters();
params.setMetricKeys("[\"a\", \"b\", \"c\"]");
params.setLabelKeys("[\"d\", \"e\", \"f\"]");
params.setHashLabels("[\"d\", \"f\"]");
params.setAddLabels("[\"m\":\"h\", \"n\":\"i\"]");
params.setTimeKey("time");
return params;
} else if (dataFormat.equalsIgnoreCase("metric2metric")) {
Metric2MetricParameters params = new Metric2MetricParameters();
params.setMetricName("name");
params.setHashLabels("[\"d\", \"f\"]");
params.setAddLabels("[\"m\":\"h\", \"n\":\"i\"]");
return params;
}
return null;
}
private static ScheduledSQLConfiguration generateConfig() {
ScheduledSQLConfiguration scheduledSQLConfiguration = new ScheduledSQLConfiguration();
scheduledSQLConfiguration.setScript(script);
scheduledSQLConfiguration.setSqlType("searchQuery");
scheduledSQLConfiguration.setResourcePool("enhanced");
scheduledSQLConfiguration.setRoleArn(roleArn);
scheduledSQLConfiguration.setDestRoleArn(roleArn);
scheduledSQLConfiguration.setSourceLogstore(sourceLogstore);
scheduledSQLConfiguration.setDestEndpoint(destEndpoint);
scheduledSQLConfiguration.setDestProject(destProject);
scheduledSQLConfiguration.setDestLogstore(destLogstore);
scheduledSQLConfiguration.setDataFormat("log2log");
scheduledSQLConfiguration.setFromTimeExpr("@m-1m");
scheduledSQLConfiguration.setToTimeExpr("@m");
scheduledSQLConfiguration.setMaxRetries(20);
scheduledSQLConfiguration.setMaxRunTimeInSeconds(600);
scheduledSQLConfiguration.setFromTime(fromTime);
scheduledSQLConfiguration.setToTime(0L);
ScheduledSQLBaseParameters params = generateParams(scheduledSQLConfiguration.getDataFormat());
scheduledSQLConfiguration.setParameters(params);
return scheduledSQLConfiguration;
}
private static ScheduledSQL generateScheduledSQL() {
ScheduledSQL scheduledSQLStructure = new ScheduledSQL();
scheduledSQLStructure.setName("job-name");
scheduledSQLStructure.setDisplayName("display-name");
scheduledSQLStructure.setDescription("desc-name");
ScheduledSQLConfiguration scheduledSQLConfiguration = generateConfig();
scheduledSQLStructure.setConfiguration(scheduledSQLConfiguration);
JobSchedule jobSchedule = new JobSchedule();
jobSchedule.setType(JobScheduleType.FIXED_RATE);
jobSchedule.setInterval("1m");
jobSchedule.setDelay(10);
jobSchedule.setRunImmediately(false);
scheduledSQLStructure.setSchedule(jobSchedule);
return scheduledSQLStructure;
}
public static void main(String[] args) {
Client client = new Client(endpoint, accessId, accessKey);
ScheduledSQL scheduledSQL = generateScheduledSQL();
CreateScheduledSQLRequest request = new CreateScheduledSQLRequest(sourceProject, scheduledSQL);
try {
client.createScheduledSQL(request);
} catch (LogException e) {
e.printStackTrace();
}
}
}
更多信息,请参见Aliyun Log Java SDK。
ScheduledSQL
调用createScheduledSQL接口创建定时SQL任务。各个参数说明如下表所示。参数名称 示例 说明 name export-123-456 任务名称。在日志服务控制台,单击某定时SQL任务,即可查看该任务名。 displayName my-scheduled-sql-job 定时SQL任务的显示名称。在日志服务控制台,选择 ,可以查看定时SQL显示名称列表。description this is a scheduled sql job. 定时SQL任务描述。 configuration scheduledSQLConfiguration ScheduledSQL任务的配置任务信息。具体配置,请参见ScheduledSQLConfiguration。 schedule jobSchedule ScheduledSQL任务的调度任务。具体配置,请参见JobSchedule。 JobSchedule
调用JobSchedule jobSchedule = new JobSchedule();
创建定时SQL任务的调度任务。各个参数说明如下表所示。参数名称 示例 说明 type FixedRate 调度定时SQL任务的频率,每调度一次定时SQL任务产生一个执行实例。调度间隔决定每个执行实例的调度时间。 - FixedRate:按照固定间隔调度定时SQL任务。固定间隔由interval参数指定。
- Hourly:每小时调度一次定时SQL任务。
- Daily:在每天的某个固定时间点调度一次定时SQL任务。
- Cron:通过Cron表达式指定时间间隔,按照指定的时间间隔调度定时SQL任务。
interval 50m 当type取值为FixedRate时,配置固定间隔。 - 3s:间隔3秒。
- 5m:间隔5分钟。
- 2h:间隔2小时。
cronExpression 无 当type取值为Cron时,配置Cron表达式。 Cron表达式的最小精度为分钟,24小时制,例如
0 0/1 * * *
表示从00:00开始,每隔1小时运行一次。当您需要配置时区时,需选择Cron模式。常见的时区列表请参见时区列表。
delay 10s 调度时间点往后延迟执行的时间。取整范围:0~120,单位:秒。 当数据写入Logstore存在延迟等情况时,可通过延迟执行来保证数据的完整性。
ScheduledSQLConfiguration
调用ScheduledSQLConfiguration scheduledSQLConfiguration = generateConfig();
创建定时SQL任务的配置任务信息。各个参数说明如下表所示。参数名称 示例 说明 script *|select count(1) 输入的查询和分析语句。 sqlType searchQuery SQL类型,固定取值为searchQuery。 resourcePool enhanced 资源池类型,固定取值为enhanced。日志服务提供增强型资源池用于数据分析。 roleArn acs:ram::11111111:role/aliyunlogetlrole
读取源Logstore数据的RAM角色标识。如何获取ARN,请参见步骤一:授予RAM角色分析源Logstore的权限。 destRoleArn acs:ram::11111111:role/aliyunlogetlrole
写入目标Logstore数据的RAM角色标识。如何获取ARN,请参见如下说明: - 如果源Logstore和目标Logstore属于同一阿里云账号,请参见步骤二:授予RAM角色写目标Logstore的权限。
- 如果源Logstore和目标Logstore属于不同的阿里云账号,请参见步骤二:授予账号B下的RAM角色b写目标Logstore的权限。
sourceLogstore source-logstore 源Logstore名称。 destEndpoint http://cn-hangzhou-intranet.log.aliyuncs.com
待写入Logstore对应的服务入口。 更多信息,请参见服务入口。说明- 阿里云产品内部相互通信。例如同地域ECS访问日志服务,建议优先使用私网域名。其私网域名为
http://cn-hangzhou-intranet.log.aliyuncs.com
。 - 通过公网(互联网)访问日志服务。例如通过本地公网的API、SDK访问日志服务,建议优先使用公网域名。其公网域名为
http://cn-hangzhou.log.aliyuncs.com
。 - 相比私网域名的计费,公网域名收费增加了外网读流量。更多信息,请参见日志服务计费项。
destProject my-project 待写入数据的目标Project名称。 destLogstore my-logstore 待写入数据的目标Logstore名称。 dataFormat log2log 写入模式。 - log2log:日志库导入日志库,即表示源Logstore中的数据通过定时SQL处理后将存储到目标Logstore中。
- log2metric:日志库导入时序库,即表示源Logstore中的数据通过定时SQL处理后将存储到目标MetricStore中。
- metric2metric:时序库导入时序库,即表示源MetricStore中的数据通过定时SQL处理后将存储到目标MetricStore中。
fromTimeExpr @m - 12s SQL时间窗口开始表达式。更多信息,请参见时间表达式语法。 toTimeExpr @m SQL时间窗口结束表达式。更多信息,请参见时间表达式语法。 maxRetries 10 执行SQL分析操作失败时自动重试的阈值。当重试次数超过最大次数时,该执行实例结束,状态为失败。 maxRunTimeInSeconds 60 执行SQL分析操作失败时自动重试的阈值。当重试时间超过指定的最大时间时,该执行实例结束,状态为失败。 fromTime 1653965045 调度开始时间。 重要 实例的调度时间必须在该范围内,超出该范围时,定时SQL任务不再产生新实例。toTime 1653968045 调度结束时间,0表示不结束。 params 无 当dataFormat取值为log2metric或metric2metric时,配置SQL配置参数。具体配置项请参见Log2MetricParameters和Metric2MetricParameters。 - 创建从Logstore到MetricStore的定时SQL任务时,还需要配置以下额外的参数:
表 1. Log2MetricParameters 参数名称 示例 说明 metricKeys "[\"a\", \"b\", \"c\"]"
指标列,对应控制台界面SQL配置的指标列。 日志服务会根据您输入的查询和分析语句聚合数据,您可以选择查询和分析结果中列值为数值类型的一列或多列作为指标列。更多信息,请参见时序数据(Metric)。
labelKeys "[\"d\", \"e\", \"f\"]"
标签列,对应控制台界面SQL配置的Labels。 日志服务会根据您输入的查询和分析语句聚合数据,您可以选择查询和分析结果中的一列或多列作为Label数据。更多信息,请参见时序数据(Metric)。
hashLabels "[\"d\", \"f\"]"
对应控制台界面SQL配置的Rehash。 打开Rehash开关后,您可以配置哈希列,用于将同一列值的数据写入到一个Shard中,增强数据局部性,提升查询效率。
哈希列的取值取决于查询和分析结果。您可以选择查询和分析结果中的一列或多列作为哈希列。例如您配置哈希列为status,则status字段值相同的数据将被写入到同一个Shard中。
addLabels "[\"m\":\"h\", \"n\":\"i\"]"
对应控制台界面SQL配置的附加Labels。 添加静态标签,键值对形式,可用于标识指标的相关属性。
例如配置label_key为app,配置label_value为ingress-nginx。
timeKey time 对应控制台界面SQL配置的时间列。 - 如果您选择查询和分析结果中的时间列(列值为Unixtime时间戳,例如
atime:1627025331
),则系统将以该时间列作为时序数据的时间。 - 如果您选择空,则系统将以查询和分析时间范围中的开始时间作为时序数据的时间。
- 如果您选择查询和分析结果中的时间列(列值为Unixtime时间戳,例如
- 创建从MetricStore到MetricStore的定时SQL任务时,还需要配置以下额外的参数:
表 2. Metric2MetricParameters 参数名称 示例 说明 metricName my-metric 如果您要修改您所分析的指标名,您可以输入修改后的指标名。更多信息,请参见时序数据(Metric)。 重要 建议分析的对象为单个指标时,修改指标名,实现重命名。如果分析对象为多个指标,则修改指标名后,会将所有的指标名修改为同一个相同的指标名。
hashLabels "[\"m\":\"h\", \"n\":\"i\"]"
对应控制台界面SQL配置的Rehash。 打开Rehash开关后,您可以配置哈希列,用于将同一Label值的数据写入到一个Shard中,增强数据局部性,提升查询效率。
哈希列的取值取决于时序数据已有的Label信息。例如时序数据已有的Label信息为
{"alert_id":"alert-1608815762-545495","alert_name":"告警恢复关闭","status":"inactive"}
,则哈希列的可选值为alert_id、alert_name、status。如果您配置哈希列为status,则status字段值相同的数据将被写入到同一个Shard中。addLabels "[\"m\":\"h\", \"n\":\"i\"]"
对应控制台界面SQL配置的附加Labels。 添加静态标签,键值对形式,可用于标识指标的相关属性。
例如配置label_key为app,配置label_value为ingress-nginx。