创建(提交)作业

接口

createJob

参数

参数

类型

是否必需

说明

jobDescription

JobDescription

Job对象中有各个任务的描述信息,和任务的DAG依赖。

jobDescription 的具体属性信息参考DAG作业APP作业

返回值

创建成功后返回一个 CreateJobResponse 实例,可以通过 response.getJobId() 获取创建的作业ID。创建失败后,抛出异常: ClientException

例子

Java 源码

import com.aliyuncs.batchcompute.main.v20151111.*;
import com.aliyuncs.batchcompute.model.v20151111.*;
import com.aliyuncs.batchcompute.pojo.v20151111.*;
import com.aliyuncs.exceptions.ClientException;

public class CreateAppJob {
    static String ACCESS_KEY_ID = "xxx";  //这里填写您的 AccessKeyId
    static String ACCESS_KEY_SECRET = "xxx"; //这里填写您的 AccessKeySecret
    static String REGION_ID = "cn-xxx";   //这里填写 region
    static String ClusterId = "cls-xxx"; //提交DAG固定集群作业需要修改其他场景不需要修改

    static boolean IS_DAG_JOB = true; //APP作业和DAG作业开关,默认提交DAG作业
    static boolean IS_AUTO_CLUSTER = true; //固定集群和非固定集群作业开关,默认提交固定集群作业

    public static void main(String[] args) {
        BatchCompute client = new BatchComputeClient(REGION_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
        try {
            JobDescription jobDescription = getJobDesc();

            CreateJobResponse response = client.createJob(jobDescription);
            String jobId = response.getJobId();
            //创建成功

            System.out.println("jobId:" + jobId);
            System.out.println("RequestId: " + response.getRequestId());
            System.out.println("StatusCode: " + response.getStatusCode());
        } catch (ClientException e) {
            e.printStackTrace();
            //创建失败
        }
    }

    private static JobDescription getJobDesc() {
        JobDescription desc = new JobDescription();

        desc.setName("javaSdkJob");
        desc.setDescription("javaSdkJob");

        //设置作业优先级
        desc.setPriority(1);

        desc.setJobFailOnInstanceFail(true);
        desc.setAutoRelease(false);

        if (IS_DAG_JOB) {
            //设置 DAG task
            desc.setType("DAG");
            desc.setDag(getDagDesc());
        }else{
            //设置 APP task
            desc.setType("App");
            desc.setApp(getAppJobDescription());
        }

        //根据业务需要设置订阅作业实践
//        Notification noti = new Notification();
//        Topic topic = new Topic();
//        topic.addEvent(Topic.ON_JOB_FAILED);
//        topic.addEvent(Topic.ON_JOB_FINISHED);
//        noti.setTopic(topic);
//        topic.setName("tp_n1");
//        topic.setEndpoint("xxxxx");
//        desc.setNotification(noti);

        return desc;
    }
    private static AppJobDescription getAppJobDescription() {
        AppJobDescription appJobDescription = new AppJobDescription();

        appJobDescription.setAppName("JavaSdkApp");
        appJobDescription.addInputs("inputFile", "oss://test/input/cromwell_app.txt");
        appJobDescription.addOutputs("outputFile", "oss://test/output/ret/");

        AppJobDescription.Logging logging = new AppJobDescription.Logging();
        logging.setStderrPath("oss://test/output/error/");
        logging.setStdoutPath("oss://test/output/log/");
        appJobDescription.setLogging(logging);

        appJobDescription.addConfig("ResourceType", "OnDemand");
        appJobDescription.addConfig("InstanceType", "ecs.sn2ne.large");
        appJobDescription.addConfig("InstanceCount", 1);

        appJobDescription.addConfig("MinDiskSize", 40);
        appJobDescription.addConfig("DiskType", "cloud_efficiency");
        appJobDescription.addConfig("MaxRetryCount", 1);
        appJobDescription.addConfig("Timeout", 1000);
        appJobDescription.addConfig("ReserveOnFail", true);
        appJobDescription.addConfig("ClassicNetwork", false);//设置集群网络方式,false为VPC组网

        appJobDescription.addConfig("MinDataDiskSize", 40);
        //注意磁盘类型和 DiskType 保持一致
        appJobDescription.addConfig("DataDiskType", "cloud_efficiency");
        //挂载点根据需要做修改,windows 为 “E:, F:, G: 等”
        appJobDescription.addConfig("DataDiskMountPoint", "/home/mount/");
        return appJobDescription;
    }
    private static DAG getDagDesc() {
        DAG dag = new DAG();
        TaskDescription task = new TaskDescription();

        task.setTaskName("javaSdkTask");

        //设置实例信息
        task.setInstanceCount(1);

        if (IS_AUTO_CLUSTER){
            //设置Auto cluster
            task.setAutoCluster(getAutoCluster());
        }else{
            //设置固定集群信息
            task.setClusterId(ClusterId);
        }

        task.setMaxRetryCount(2);
        task.setTimeout(10000);

        Parameters parameters = new Parameters();
        Command cmd = new Command();
        //设置程序启动命令
        cmd.setCommandLine("python runtask.py 顿雳意当更冁");
        //设置程序启动脚本或者执行文件地址
        cmd.setPackagePath("oss://yuanhyyshenzhen/test/installpackage/runtask.tar.gz");

        //docker 镜像设置方式:推荐使用容器镜像模式
        //1、镜像在oss registry上,设置docker的方式
        //oss registry模式,参数设置好后自行打开注释
        //cmd.addEnvVars("BATCH_COMPUTE_DOCKER_IMAGE", "localhost:5000/yuorBucket/dockers:0.1");//镜像名称;
        //cmd.addEnvVars("BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH", "oss://your-bucket/dockers");//设置OSS地址

        //2、镜像在容器镜像仓库,设置docker方式
        //Command.Docker docker = new Command.Docker();
        //docker.setImage("registry.cn-beijing.aliyuncs.com/demotest/test:0.1");
        //cmd.setDocker(docker);

        parameters.setCommand(cmd);

        //设置标准输出,上传的OSS路径
        parameters.setStderrRedirectPath("oss://test/output/error/");
        parameters.setStdoutRedirectPath("oss://test/output/log/");

        InputMappingConfig input = new InputMappingConfig();
        input.setLocale("GBK");
        input.setLock(true);
        parameters.setInputMappingConfig(input);

        task.setParameters(parameters);

        //设置输入OSS路径和本地路径关系
        task.addInputMapping("oss://test/input/", "/home/admin/disk1/");

        //设置输出本地路径和OSS地址
        task.addOutputMapping("/home/admin/disk2/", "oss://test/output/ret/");

        //设置挂载信息
        Mounts mounts = new Mounts();

        MountEntry mountEntry = new MountEntry();
        mountEntry.setDestination("/home/mount");
        mountEntry.setSource("oss://test/mount/");
        mountEntry.setWriteSupport(false);

        mounts.setCacheSupport(false);

        //windows set GBK; Liux set utf-8
        //mounts.setLocale("GBK");
        mounts.setLock(false);
        mounts.addEntries(mountEntry);

        //task.setMounts(mounts);

        dag.addTask(task);
        return dag;
    }
    private static AutoCluster getAutoCluster() {
        AutoCluster autoCluster = new AutoCluster();

        //设置集群镜像信息ECSImageId 在不同region可能会发生变化
        //autoCluster.setECSImageId("m-wz9dk5nao5z3fw6bo9k6");
        //建议使用setImageId接口设置
        autoCluster.setImageId("img-ubuntu");

        autoCluster.setInstanceType("ecs.s3.large");
        autoCluster.setReserveOnFail(true);

        //设置资源类型只有ResourceType为Spot的情况下后面两项有效
        autoCluster.setResourceType("OnDemand");
        //autoCluster.setSpotPriceLimit(5.6f);
        //autoCluster.setSpotStrategy("Spot");

        //设置config信息
        autoCluster.setConfigs(getConfigDesc());

        return autoCluster;
    }
    private static Configs getConfigDesc() {
        Configs configs = new Configs();

        //设置系统磁盘类型以及大型
        Disks disks = new Disks();
        SystemDisk systemDisk = new SystemDisk();
        systemDisk.setSize(40);//GB
        systemDisk.setType("cloud_efficiency");
        disks.setSystemDisk(systemDisk);

        DataDisk dataDisk = new DataDisk();
        dataDisk.setMountPoint("/home/dataDisk/");
        dataDisk.setSize(40);
        dataDisk.setType("cloud_efficiency");
        disks.setDataDisk(dataDisk);

        configs.setDisks(disks);

        //设置网络类型
        Networks networks = new Networks();
        VPC vpc = new VPC();
        vpc.setCidrBlock("10.0.0.0/12");
        networks.setVpc(vpc);
        configs.setNetworks(networks);

        return configs;
    }
}
执行结果:
```JSON
{
    jobId: job-000000005BE3E897000007FA00114EE9
    RequestId: null
    StatusCode: 201
}

注意

  1. 本实例代码支持提交 APP 和 DAG 类型作业,支持 AutoCluster 和固定集群类型的作业,提交作业之前根据业务需要修改开关(IS_DAG_JOBIS_AUTO_CLUSTER)即可。

  2. 若是提交 APP 类型作业,需要在提交作业之前创建 APP,然后根据 APP 的创建参数做对应修改作业参数,最后进行作业提交。

  3. 提交固定集群作业之前需要先创建集群,修改 ClusterId 为新创建的集群,然后提交作业。

  4. 提交作业前,请确保 OSS 地址填写正确并且已经上传输入或者执行文件到对应的 OSS 路径