管理定时SQL任务实例

本文介绍如何使用阿里云日志服务(SLS)Java SDK 对定时 SQL 的任务实例进行管理。包括:

  • 查看运行状态:查询特定时间段内任务的执行成功、失败或运行中状态。

  • 失败重试:对执行失败的实例,或历史数据需要重新计算的实例进行重跑。

  • 停止执行:对处于“运行中”且卡死或耗时过长的实例强制停止。

前提条件

代码示例

本示例展示如何通过SDK操作定时SQL任务实例,如:查询实例列表、获取实例详情、实例重试、实例暂停。

package demo;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.*;
import com.aliyun.openservices.log.response.*;

import java.util.concurrent.TimeUnit;


public class Demo {
    // 配置服务入口 Endpoint,请替换为实际 Project 所在的 Endpoint
    private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
    // 配置 AccessKey 信息
    private static final String accessKeyId = "your_access_key_id";
    private static final String accessKeySecret = "your_access_key_secret";
    // 配置目标 Project 和 定时SQL 任务名称
    private static final String project = "your_project_name";
    private static final String jobName = "your_scheduled_sql_job_name";
    private static final Client client = new Client(endpoint, accessKeyId, accessKeySecret);
    private static String instanceId = "11111";
    private static final long fromTime = 1764216000;
    private static final long toTime = 1764302400;

    private static void testGetJobInstance() throws LogException {
        // JobInstances
        GetJobInstanceResponse getJobInstanceResponse = getJobInstance();
        System.out.println("getJobInstance: " + JSONObject.toJSONString(getJobInstanceResponse));
    }

    private static void testListJobInstance() throws LogException, InterruptedException {
        // List jobInstance
        System.out.println("Wait for start jobInstance...");
        TimeUnit.MINUTES.sleep(5);
        ListJobInstancesResponse listJobInstancesResponse = client.listJobInstances(new ListJobInstancesRequest(project, jobName, fromTime, toTime));
        if (listJobInstancesResponse.getResults().size() > 0) {
            instanceId = listJobInstancesResponse.getResults().get(0).getInstanceId();
        } else {
            throw new LogException("NoJobInstance", "JobInstances have not start, please wait.", "");
        }
        System.out.println("list JobInstances: " + JSONObject.toJSONString(listJobInstancesResponse));
    }

    private static void testRerunJobInstance() throws LogException, InterruptedException {
        System.out.println("Rerun jobInstance ready to start.......");
        // Start jobInstance
        GetJobInstanceResponse getJobInstanceResponse = getJobInstance();
        String state = getJobInstanceResponse.getJobInstance().getState();
        if ("SUCCEEDED".equals(state) || "FAILED".equals(state)) {
            client.modifyJobInstanceState(new ModifyJobInstanceStateRequest(project, jobName, instanceId, "RUNNING"));
        }
    }

    private static void testStopJobInstance() throws LogException {
        System.out.println("Stop jobInstance ready to start.......");
        // Stop jobInstance
        ModifyJobInstanceStateResponse modifyJobInstanceStateResponse = client.modifyJobInstanceState(new ModifyJobInstanceStateRequest(project, jobName, instanceId, "STOPPED"));
    }

    private static GetJobInstanceResponse getJobInstance() throws LogException {
        System.out.println("Get JobInstance ready to start.....");
        return client.getJobInstance(new GetJobInstanceRequest(project, jobName, instanceId));
    }

    public static void main(String[] args) throws InterruptedException, LogException {
        testListJobInstance();
        testGetJobInstance();
        testStopJobInstance();
        testRerunJobInstance();
    }
}