本文介绍如何使用阿里云日志服务(SLS)Java SDK 对定时 SQL 的任务实例进行管理。包括:
查看运行状态:查询特定时间段内任务的执行成功、失败或运行中状态。
失败重试:对执行失败的实例,或历史数据需要重新计算的实例进行重跑。
停止执行:对处于“运行中”且卡死或耗时过长的实例强制停止。
前提条件
代码示例
本示例展示如何通过SDK操作定时SQL任务实例,如:查询实例列表、获取实例详情、实例重试、实例暂停。
package demo;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.*;
import com.aliyun.openservices.log.response.*;
import java.util.concurrent.TimeUnit;
public class Demo {
// 配置服务入口 Endpoint,请替换为实际 Project 所在的 Endpoint
private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
// 配置 AccessKey 信息
private static final String accessKeyId = "your_access_key_id";
private static final String accessKeySecret = "your_access_key_secret";
// 配置目标 Project 和 定时SQL 任务名称
private static final String project = "your_project_name";
private static final String jobName = "your_scheduled_sql_job_name";
private static final Client client = new Client(endpoint, accessKeyId, accessKeySecret);
private static String instanceId = "11111";
private static final long fromTime = 1764216000;
private static final long toTime = 1764302400;
private static void testGetJobInstance() throws LogException {
// JobInstances
GetJobInstanceResponse getJobInstanceResponse = getJobInstance();
System.out.println("getJobInstance: " + JSONObject.toJSONString(getJobInstanceResponse));
}
private static void testListJobInstance() throws LogException, InterruptedException {
// List jobInstance
System.out.println("Wait for start jobInstance...");
TimeUnit.MINUTES.sleep(5);
ListJobInstancesResponse listJobInstancesResponse = client.listJobInstances(new ListJobInstancesRequest(project, jobName, fromTime, toTime));
if (listJobInstancesResponse.getResults().size() > 0) {
instanceId = listJobInstancesResponse.getResults().get(0).getInstanceId();
} else {
throw new LogException("NoJobInstance", "JobInstances have not start, please wait.", "");
}
System.out.println("list JobInstances: " + JSONObject.toJSONString(listJobInstancesResponse));
}
private static void testRerunJobInstance() throws LogException, InterruptedException {
System.out.println("Rerun jobInstance ready to start.......");
// Start jobInstance
GetJobInstanceResponse getJobInstanceResponse = getJobInstance();
String state = getJobInstanceResponse.getJobInstance().getState();
if ("SUCCEEDED".equals(state) || "FAILED".equals(state)) {
client.modifyJobInstanceState(new ModifyJobInstanceStateRequest(project, jobName, instanceId, "RUNNING"));
}
}
private static void testStopJobInstance() throws LogException {
System.out.println("Stop jobInstance ready to start.......");
// Stop jobInstance
ModifyJobInstanceStateResponse modifyJobInstanceStateResponse = client.modifyJobInstanceState(new ModifyJobInstanceStateRequest(project, jobName, instanceId, "STOPPED"));
}
private static GetJobInstanceResponse getJobInstance() throws LogException {
System.out.println("Get JobInstance ready to start.....");
return client.getJobInstance(new GetJobInstanceRequest(project, jobName, instanceId));
}
public static void main(String[] args) throws InterruptedException, LogException {
testListJobInstance();
testGetJobInstance();
testStopJobInstance();
testRerunJobInstance();
}
}该文章对您有帮助吗?