本文为您介绍如何使用Java SDK获取任务(Task)的执行进度。

背景信息

Java SDK提供了Instance#getTaskProgress方法用于获取所有Stage中Worker的状态,通过Worker的状态可以计算出Task整体的大致进度。

获取任务执行进度过程中会涉及到MaxCompute Instance、MaxCompute Task、Fuxi Job、Fuxi Task(Stage)、Fuxi Instance(Worker),关系说明如下:
  • 一个MaxCompute Instance一般对应一个MaxCompute Task。
  • 一个MaxCompute Task可以由一个或多个Fuxi Job组成,如果SQL比较复杂,MaxCompute将自动向Fuxi提交多个Fuxi Job。
  • 一个Fuxi Job由任意多个Stage构成。Stage由任意多个Worker构成。Stage内的Worker有相同的数据处理逻辑。
Stage

示例

打印Stage执行进度示例如下。
import java.util.List;
import java.util.concurrent.CompletableFuture;

import com.aliyun.odps.Instance;
import com.aliyun.odps.LogView;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.task.SQLTask;

public class InstanceManange {

    private static final String STAGE_FORMAT = "%-26s %13s  %5s  %9s  %7s  %7s  %6s";


    public static void printStage(List<Instance.StageProgress> stageprogresses) throws OdpsException {


        long startTime = System.currentTimeMillis();
        // List<Instance.StageProgress> stageprogresses = instance.getTaskProgress(taskName);

        // STAGES        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  BACKUP
        String HEADER_FORMAT = "%26s %13s  %5s  %9s  %7s  %7s  %6s";
        System.out.println(String.format(HEADER_FORMAT, "STAGES", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "BACKUP"));

        int sumComplete = 0;
        int sumTotal = 0;
        int stageCompletedSum = 0;

        for (Instance.StageProgress progress : stageprogresses) {
            String name = progress.getName();
            // NOTE: getTotalWorkers() 不包含 backupWorkers,大部分情况下 getBackupWorkers() 返回 0qu
            int backup = progress.getBackupWorkers();
            int total = progress.getTotalWorkers();
            int all = backup + total;
            int running = progress.getRunningWorkers();
            int completed = progress.getTerminatedWorkers();
            int pending = all - completed - running;

            Instance.StageProgress.Status status = progress.getStatus();
            String statusString = status != null ? status.toString() : "NULL";

            // M1_job_0     RUNNING      7          7        0        0       0       0     0
            String vertexStr = String.format(STAGE_FORMAT, name, statusString, total, completed, running, pending, backup);
            System.out.println(vertexStr);
            sumComplete += completed;
            sumTotal += total;

            // Mark the stage as Completed
            if (status == Instance.StageProgress.Status.TERMINATED) {
                stageCompletedSum += 1;
            }

        }

        String FOOTER_FORMAT = "%-15s  %-4s  %-25s";

        String verticesSummary = String.format("STAGES: %02d/%02d", stageCompletedSum, stageprogresses.size());
        final float progress = (sumTotal == 0) ? 0.0f : (float) sumComplete / (float) sumTotal;
        final int progressPercent = (int) (progress * 100);
        String progressStr = "" + progressPercent + "%";
        float et = (float) (System.currentTimeMillis() - startTime) / (float) 1000;
        String elapsedTime = "ELAPSED TIME: " + et + " s";

        System.out.println(String.format(FOOTER_FORMAT, verticesSummary, progressStr, elapsedTime));
        System.out.println();
    }

    public static void main(String[] args) throws Exception {
        String ak = "";
        String sk = "";
        String endpoint = "";
        String project = "";
        String sql = "";
        Odps odps = new Odps(new AliyunAccount(ak, sk));
        odps.setEndpoint(endpoint);
        odps.setDefaultProject(project);

        String taskName = "TestGetProgressTask";
        Instance instance = SQLTask.run(odps, project, sql, taskName, null, null);

        System.out.println("LogView:\n" + new LogView(odps).generateLogView(instance, 24) + "\n");
        instance.getTaskDetailJson2(taskName)

        CompletableFuture<Void> future = CompletableFuture.runAsync((() -> {
            try {
                while (true) {
                    printStage(instance.getTaskProgress(taskName));
                    Thread.sleep(200);
                    if (instance.isSuccessful()) {
                        // 成功后获取到的 List<Instance.StageProgress> 是空的
                        printStage(instance.getTaskProgress(taskName));
                        return;
                    }
                }
            } catch (OdpsException | InterruptedException e) {
                e.printStackTrace();
                return;
            }
        }));
        future.join();
    }

}
返回结果示例如下。

STAGES                           STATUS      TOTAL  COMPLETED  RUNNING  PENDING  BACKUP
M1_job_0                         RUNNING      1          0        0        1       0
M2_1_job_0                       WAITING      1          0        0        1       0
STAGES: 00/02     0%    ELAPSED TIME: 20.97 s