文档

使用Java SDK创建定时SQL任务

更新时间:

本文介绍通过Java SDK创建定时SQL任务的代码示例。

前提条件

  • 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权

  • 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见在Linux、macOS和Windows系统配置环境变量

    重要
    • 阿里云账号的AccessKey拥有所有API的访问权限,建议您使用RAM用户的AccessKey进行API访问或日常运维。

    • 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。

  • 已安装0.6.69及以上版本的Java SDK。更多信息,请参见安装Java SDK

背景信息

日志服务提供定时SQL功能,用于定时分析数据、存储聚合数据、投影与过滤数据。定时SQL支持标准SQL92语法、日志服务查询和分析语法,按照调度规则周期性执行,并将运行结果写入到目标库(Logstore或Metricstore)中。

日志服务控制台提供可视化的创建定时SQL任务。具体操作,请参见创建定时SQL任务

除此之外,日志服务提供ScheduledSQL、JobSchedule和ScheduledSQLConfiguration类,帮助您更简单的通过Java SDK创建定时SQL任务。

  • ScheduledSQL:创建一个定时SQL任务。

  • JobSchedule:创建定时SQL的调度任务。

  • ScheduledSQLConfiguration:创建定时SQL任务的基本配置。

示例代码

本示例中,创建一个App.java文件,将源Logstore的定时分析结果存储到目标Logstore。示例如下:

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.CreateScheduledSQLRequest;

public class App {
    // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
    static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");

    // 设置Project名称和Logstore名称。
    static String sourceProject="aliyun-test-sourceProject";
    static String destProject="aliyun-test-destProject";
    static String sourceLogstore = "logstore-name";
    static String destLogstore = "project-name";
    static String roleArn = "acs:ram::11111111:role/aliyunlogetlrole";
    // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
    static String endpoint = "http://cn-hangzhou.log.aliyuncs.com";
    static String destEndpoint = "http://cn-hangzhou-intranet.log.aliyuncs.com";
    static long fromTime = 1648105200; //2022-03-23 15:00:00
    private static String script = "* | select a,b,c from log";

    private static ScheduledSQLBaseParameters generateParams(String dataFormat) {
        if (dataFormat.equalsIgnoreCase("log2log")) {
            return null;
        } else if (dataFormat.equalsIgnoreCase("log2metric")) {
            Log2MetricParameters params = new Log2MetricParameters();
            params.setMetricKeys("[\"a\", \"b\", \"c\"]");
            params.setLabelKeys("[\"d\", \"e\", \"f\"]");
            params.setHashLabels("[\"d\", \"f\"]");
            params.setAddLabels("[\"m\":\"h\", \"n\":\"i\"]");
            params.setTimeKey("time");
            return params;
        } else if (dataFormat.equalsIgnoreCase("metric2metric")) {
            Metric2MetricParameters params = new Metric2MetricParameters();
            params.setMetricName("name");
            params.setHashLabels("[\"d\", \"f\"]");
            params.setAddLabels("[\"m\":\"h\", \"n\":\"i\"]");
            return params;
        }
        return null;
    }

    private static ScheduledSQLConfiguration generateConfig() {
        ScheduledSQLConfiguration scheduledSQLConfiguration = new ScheduledSQLConfiguration();
        scheduledSQLConfiguration.setScript(script);
        scheduledSQLConfiguration.setSqlType("searchQuery");
        scheduledSQLConfiguration.setResourcePool("enhanced");
        scheduledSQLConfiguration.setRoleArn(roleArn);
        scheduledSQLConfiguration.setDestRoleArn(roleArn);
        scheduledSQLConfiguration.setSourceLogstore(sourceLogstore);
        scheduledSQLConfiguration.setDestEndpoint(destEndpoint);
        scheduledSQLConfiguration.setDestProject(destProject);
        scheduledSQLConfiguration.setDestLogstore(destLogstore);
        scheduledSQLConfiguration.setDataFormat("log2log");
        scheduledSQLConfiguration.setFromTimeExpr("@m-1m");
        scheduledSQLConfiguration.setToTimeExpr("@m");
        scheduledSQLConfiguration.setMaxRetries(20);
        scheduledSQLConfiguration.setMaxRunTimeInSeconds(600);
        scheduledSQLConfiguration.setFromTime(fromTime);
        scheduledSQLConfiguration.setToTime(0L);

        ScheduledSQLBaseParameters params = generateParams(scheduledSQLConfiguration.getDataFormat());
        scheduledSQLConfiguration.setParameters(params);

        return scheduledSQLConfiguration;
    }

    private static ScheduledSQL generateScheduledSQL() {
        ScheduledSQL scheduledSQLStructure = new ScheduledSQL();
        scheduledSQLStructure.setName("job-name");
        scheduledSQLStructure.setDisplayName("display-name");
        scheduledSQLStructure.setDescription("desc-name");
        ScheduledSQLConfiguration scheduledSQLConfiguration = generateConfig();
        scheduledSQLStructure.setConfiguration(scheduledSQLConfiguration);
        JobSchedule jobSchedule = new JobSchedule();
        jobSchedule.setType(JobScheduleType.FIXED_RATE);
        jobSchedule.setInterval("1m");
        jobSchedule.setDelay(10);
        jobSchedule.setRunImmediately(false);
        scheduledSQLStructure.setSchedule(jobSchedule);
        return scheduledSQLStructure;
    }

    public static void main(String[] args) {
        Client client = new Client(endpoint, accessId, accessKey);
        ScheduledSQL scheduledSQL = generateScheduledSQL();
        CreateScheduledSQLRequest request = new CreateScheduledSQLRequest(sourceProject, scheduledSQL);
        try {
            client.createScheduledSQL(request);
        } catch (LogException e) {
            e.printStackTrace();
        }
    }
} 

更多信息,请参见Aliyun Log Java SDK

  • ScheduledSQL

    调用createScheduledSQL接口创建定时SQL任务。各个参数说明如下表所示。

    参数名称

    示例

    说明

    name

    export-123-456

    任务名称。在日志服务控制台,单击某定时SQL任务,即可查看该任务名。

    displayName

    my-scheduled-sql-job

    定时SQL任务的显示名称。在日志服务控制台,选择任务管理 > 定时SQL,可以查看定时SQL显示名称列表。

    description

    this is a scheduled sql job.

    定时SQL任务描述。

    configuration

    scheduledSQLConfiguration

    ScheduledSQL任务的配置任务信息。具体配置,请参见ScheduledSQLConfiguration

    schedule

    jobSchedule

    ScheduledSQL任务的调度任务。具体配置,请参见JobSchedule

  • JobSchedule

    调用JobSchedule jobSchedule = new JobSchedule();创建定时SQL任务的调度任务。各个参数说明如下表所示。

    参数名称

    示例

    说明

    type

    FixedRate

    调度定时SQL任务的频率,每调度一次定时SQL任务产生一个执行实例。调度间隔决定每个执行实例的调度时间。

    • FixedRate:按照固定间隔调度定时SQL任务。固定间隔由interval参数指定。

    • Hourly:每小时调度一次定时SQL任务。

    • Daily:在每天的某个固定时间点调度一次定时SQL任务。

    • Cron:通过Cron表达式指定时间间隔,按照指定的时间间隔调度定时SQL任务。

    interval

    50m

    type取值为FixedRate时,配置固定间隔。

    • 3s:间隔3秒。

    • 5m:间隔5分钟。

    • 2h:间隔2小时。

    cronExpression

    type取值为Cron时,配置Cron表达式。

    Cron表达式的最小精度为分钟,24小时制,例如0 0/1 * * *表示从00:00开始,每隔1小时运行一次。

    当您需要配置时区时,需选择Cron模式。常见的时区列表请参见时区列表

    delay

    10s

    调度时间点往后延迟执行的时间。取整范围:0~120,单位:秒。

    当数据写入Logstore存在延迟等情况时,可通过延迟执行来保证数据的完整性。

  • ScheduledSQLConfiguration

    调用ScheduledSQLConfiguration scheduledSQLConfiguration = generateConfig();创建定时SQL任务的配置任务信息。各个参数说明如下表所示。

    参数名称

    示例

    说明

    script

    *|select count(1)

    输入的查询和分析语句。

    sqlType

    searchQuery

    SQL类型,固定取值为searchQuery

    resourcePool

    enhanced

    资源池类型,固定取值为enhanced。日志服务提供增强型资源池用于数据分析。

    roleArn

    acs:ram::11111111:role/aliyunlogetlrole

    读取源Logstore数据的RAM角色标识。如何获取ARN,请参见步骤一:授予RAM角色分析源Logstore的权限

    destRoleArn

    acs:ram::11111111:role/aliyunlogetlrole

    写入目标Logstore数据的RAM角色标识。如何获取ARN,请参见如下说明:

    sourceLogstore

    source-logstore

    源Logstore名称。

    destEndpoint

    http://cn-hangzhou-intranet.log.aliyuncs.com

    待写入Logstore对应的服务入口。

    说明
    • 阿里云产品内部相互通信。例如同地域ECS访问日志服务,建议优先使用私网域名。其私网域名为http://cn-hangzhou-intranet.log.aliyuncs.com

    • 通过公网(互联网)访问日志服务。例如通过本地公网的API、SDK访问日志服务,建议优先使用公网域名。其公网域名为http://cn-hangzhou.log.aliyuncs.com

    • 相比私网域名的计费,公网域名收费增加了外网读流量。更多信息,请参见日志服务计费项

    更多信息,请参见服务入口

    destProject

    my-project

    待写入数据的目标Project名称。

    destLogstore

    my-logstore

    待写入数据的目标Logstore名称。

    dataFormat

    log2log

    写入模式。

    • log2log:日志库导入日志库,即表示源Logstore中的数据通过定时SQL处理后将存储到目标Logstore中。

    • log2metric:日志库导入时序库,即表示源Logstore中的数据通过定时SQL处理后将存储到目标MetricStore中。

    • metric2metric:时序库导入时序库,即表示源MetricStore中的数据通过定时SQL处理后将存储到目标MetricStore中。

    fromTimeExpr

    @m - 12s

    SQL时间窗口开始表达式。更多信息,请参见时间表达式语法

    toTimeExpr

    @m

    SQL时间窗口结束表达式。更多信息,请参见时间表达式语法

    maxRetries

    10

    执行SQL分析操作失败时自动重试的阈值。当重试次数超过最大次数时,该执行实例结束,状态为失败。

    maxRunTimeInSeconds

    60

    执行SQL分析操作失败时自动重试的阈值。当重试时间超过指定的最大时间时,该执行实例结束,状态为失败。

    fromTime

    1653965045

    调度开始时间。

    重要

    实例的调度时间必须在该范围内,超出该范围时,定时SQL任务不再产生新实例。

    toTime

    1653968045

    调度结束时间,0表示不结束。

    params

    dataFormat取值为log2metricmetric2metric时,配置SQL配置参数。具体配置项请参见Log2MetricParametersMetric2MetricParameters

    • 创建从Logstore到MetricStore的定时SQL任务时,还需要配置以下额外的参数:

      表 1. Log2MetricParameters

      参数名称

      示例

      说明

      metricKeys

      "[\"a\", \"b\", \"c\"]"

      指标列,对应控制台界面SQL配置的指标列。

      日志服务会根据您输入的查询和分析语句聚合数据,您可以选择查询和分析结果中列值为数值类型的一列或多列作为指标列。更多信息,请参见时序数据(Metric)

      labelKeys

      "[\"d\", \"e\", \"f\"]"

      标签列,对应控制台界面SQL配置的Labels。

      日志服务会根据您输入的查询和分析语句聚合数据,您可以选择查询和分析结果中的一列或多列作为Label数据。更多信息,请参见时序数据(Metric)

      hashLabels

      "[\"d\", \"f\"]"

      对应控制台界面SQL配置的Rehash。

      打开Rehash开关后,您可以配置哈希列,用于将同一列值的数据写入到一个Shard中,增强数据局部性,提升查询效率。

      哈希列的取值取决于查询和分析结果。您可以选择查询和分析结果中的一列或多列作为哈希列。例如您配置哈希列status,则status字段值相同的数据将被写入到同一个Shard中。

      addLabels

      "[\"m\":\"h\", \"n\":\"i\"]"

      对应控制台界面SQL配置的附加Labels。

      添加静态标签,键值对形式,可用于标识指标的相关属性。

      例如配置label_keyapp,配置label_valueingress-nginx

      timeKey

      time

      对应控制台界面SQL配置的时间列。

      • 如果您选择查询和分析结果中的时间列(列值为Unixtime时间戳,例如atime:1627025331),则系统将以该时间列作为时序数据的时间。

      • 如果您选择,则系统将以查询和分析时间范围中的开始时间作为时序数据的时间。

      Log2MetricParameters

    • 创建从MetricStore到MetricStore的定时SQL任务时,还需要配置以下额外的参数:

      表 2. Metric2MetricParameters

      参数名称

      示例

      说明

      metricName

      my-metric

      如果您要修改您所分析的指标名,您可以输入修改后的指标名。更多信息,请参见时序数据(Metric)

      重要

      建议分析的对象为单个指标时,修改指标名,实现重命名。

      如果分析对象为多个指标,则修改指标名后,会将所有的指标名修改为同一个相同的指标名。

      hashLabels

      "[\"m\":\"h\", \"n\":\"i\"]"

      对应控制台界面SQL配置的Rehash。

      打开Rehash开关后,您可以配置哈希列,用于将同一Label值的数据写入到一个Shard中,增强数据局部性,提升查询效率。

      哈希列的取值取决于时序数据已有的Label信息。例如时序数据已有的Label信息为{"alert_id":"alert-1608815762-545495","alert_name":"告警恢复关闭","status":"inactive"},则哈希列的可选值为alert_idalert_namestatus。如果您配置哈希列status,则status字段值相同的数据将被写入到同一个Shard中。

      addLabels

      "[\"m\":\"h\", \"n\":\"i\"]"

      对应控制台界面SQL配置的附加Labels。

      添加静态标签,键值对形式,可用于标识指标的相关属性。

      例如配置label_keyapp,配置label_valueingress-nginx

      Metric2MetricParameters

  • 本页导读 (1)