本文为您介绍如何使用Java SDK获取任务(Task)的执行进度。
背景信息
Java SDK提供了Instance#getTaskProgress
方法用于获取所有Stage中Worker的状态,通过Worker的状态可以计算出Task整体的大致进度。
获取任务执行进度过程中会涉及到MaxCompute Instance、MaxCompute Task、Fuxi Job、Fuxi Task(Stage)、Fuxi Instance(Worker),关系说明如下:
- 一个MaxCompute Instance一般对应一个MaxCompute Task。
- 一个MaxCompute Task可以由一个或多个Fuxi Job组成,如果SQL比较复杂,MaxCompute将自动向Fuxi提交多个Fuxi Job。
- 一个Fuxi Job由任意多个Stage构成。Stage由任意多个Worker构成。Stage内的Worker有相同的数据处理逻辑。
示例
打印Stage执行进度示例如下。
import java.util.List;
import java.util.concurrent.CompletableFuture;
import com.aliyun.odps.Instance;
import com.aliyun.odps.LogView;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.task.SQLTask;
public class InstanceManange {
private static final String STAGE_FORMAT = "%-26s %13s %5s %9s %7s %7s %6s";
public static void printStage(List<Instance.StageProgress> stageprogresses) throws OdpsException {
long startTime = System.currentTimeMillis();
// List<Instance.StageProgress> stageprogresses = instance.getTaskProgress(taskName);
// STAGES STATUS TOTAL COMPLETED RUNNING PENDING BACKUP
String HEADER_FORMAT = "%26s %13s %5s %9s %7s %7s %6s";
System.out.println(String.format(HEADER_FORMAT, "STAGES", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "BACKUP"));
int sumComplete = 0;
int sumTotal = 0;
int stageCompletedSum = 0;
for (Instance.StageProgress progress : stageprogresses) {
String name = progress.getName();
// NOTE: getTotalWorkers() 不包含 backupWorkers,大部分情况下 getBackupWorkers() 返回 0qu
int backup = progress.getBackupWorkers();
int total = progress.getTotalWorkers();
int all = backup + total;
int running = progress.getRunningWorkers();
int completed = progress.getTerminatedWorkers();
int pending = all - completed - running;
Instance.StageProgress.Status status = progress.getStatus();
String statusString = status != null ? status.toString() : "NULL";
// M1_job_0 RUNNING 7 7 0 0 0 0 0
String vertexStr = String.format(STAGE_FORMAT, name, statusString, total, completed, running, pending, backup);
System.out.println(vertexStr);
sumComplete += completed;
sumTotal += total;
// Mark the stage as Completed
if (status == Instance.StageProgress.Status.TERMINATED) {
stageCompletedSum += 1;
}
}
String FOOTER_FORMAT = "%-15s %-4s %-25s";
String verticesSummary = String.format("STAGES: %02d/%02d", stageCompletedSum, stageprogresses.size());
final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
final int progressPercent = (int) (progress * 100);
String progressStr = "" + progressPercent + "%";
float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
String elapsedTime = "ELAPSED TIME: " + et + " s";
System.out.println(String.format(FOOTER_FORMAT, verticesSummary, progressStr, elapsedTime));
System.out.println();
}
public static void main(String[] args) throws Exception {
String ak = "";
String sk = "";
String endpoint = "";
String project = "";
String sql = "";
Odps odps = new Odps(new AliyunAccount(ak, sk));
odps.setEndpoint(endpoint);
odps.setDefaultProject(project);
String taskName = "TestGetProgressTask";
Instance instance = SQLTask.run(odps, project, sql, taskName, null, null);
System.out.println("LogView:\n" + new LogView(odps).generateLogView(instance, 24) + "\n");
instance.getTaskDetailJson2(taskName)
CompletableFuture<Void> future = CompletableFuture.runAsync((() -> {
try {
while (true) {
printStage(instance.getTaskProgress(taskName));
Thread.sleep(200);
if (instance.isSuccessful()) {
// 成功后获取到的 List<Instance.StageProgress> 是空的
printStage(instance.getTaskProgress(taskName));
return;
}
}
} catch (OdpsException | InterruptedException e) {
e.printStackTrace();
return;
}
}));
future.join();
}
}
返回结果示例如下。
STAGES STATUS TOTAL COMPLETED RUNNING PENDING BACKUP
M1_job_0 RUNNING 1 0 0 1 0
M2_1_job_0 WAITING 1 0 0 1 0
STAGES: 00/02 0% ELAPSED TIME: 20.97 s