全部产品
云市场

创建(提交)作业

更新时间:2019-04-18 10:02:09

接口

createJob

参数

参数 类型 是否必需 说明
jobDescription JobDescription Job对象中有各个任务的描述信息,和任务的DAG依赖。

jobDescription 的具体属性信息参考DAG作业APP作业

返回值

创建成功后返回一个 CreateJobResponse 实例,可以通过 response.getJobId() 获取创建的作业ID。创建失败后,抛出异常: ClientException

例子

Java 源码

  1. import com.aliyuncs.batchcompute.main.v20151111.*;
  2. import com.aliyuncs.batchcompute.model.v20151111.*;
  3. import com.aliyuncs.batchcompute.pojo.v20151111.*;
  4. import com.aliyuncs.exceptions.ClientException;
  5. public class CreateAppJob {
  6. static String ACCESS_KEY_ID = "xxx"; //这里填写您的 AccessKeyId
  7. static String ACCESS_KEY_SECRET = "xxx"; //这里填写您的 AccessKeySecret
  8. static String REGION_ID = "cn-xxx"; //这里填写 region
  9. static String ClusterId = "cls-xxx"; //提交DAG固定集群作业需要修改其他场景不需要修改
  10. static boolean IS_DAG_JOB = true; //APP作业和DAG作业开关,默认提交DAG作业
  11. static boolean IS_AUTO_CLUSTER = true; //固定集群和非固定集群作业开关,默认提交固定集群作业
  12. public static void main(String[] args) {
  13. BatchCompute client = new BatchComputeClient(REGION_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
  14. try {
  15. JobDescription jobDescription = getJobDesc();
  16. CreateJobResponse response = client.createJob(jobDescription);
  17. String jobId = response.getJobId();
  18. //创建成功
  19. System.out.println("jobId:" + jobId);
  20. System.out.println("RequestId: " + response.getRequestId());
  21. System.out.println("StatusCode: " + response.getStatusCode());
  22. } catch (ClientException e) {
  23. e.printStackTrace();
  24. //创建失败
  25. }
  26. }
  27. private static JobDescription getJobDesc() {
  28. JobDescription desc = new JobDescription();
  29. desc.setName("javaSdkJob");
  30. desc.setDescription("javaSdkJob");
  31. //设置作业优先级
  32. desc.setPriority(1);
  33. desc.setJobFailOnInstanceFail(true);
  34. desc.setAutoRelease(false);
  35. if (IS_DAG_JOB) {
  36. //设置 DAG task
  37. desc.setType("DAG");
  38. desc.setDag(getDagDesc());
  39. }else{
  40. //设置 APP task
  41. desc.setType("App");
  42. desc.setApp(getAppJobDescription());
  43. }
  44. //根据业务需要设置订阅作业实践
  45. // Notification noti = new Notification();
  46. // Topic topic = new Topic();
  47. // topic.addEvent(Topic.ON_JOB_FAILED);
  48. // topic.addEvent(Topic.ON_JOB_FINISHED);
  49. // noti.setTopic(topic);
  50. // topic.setName("tp_n1");
  51. // topic.setEndpoint("xxxxx");
  52. // desc.setNotification(noti);
  53. return desc;
  54. }
  55. private static AppJobDescription getAppJobDescription() {
  56. AppJobDescription appJobDescription = new AppJobDescription();
  57. appJobDescription.setAppName("JavaSdkApp");
  58. appJobDescription.addInputs("inputFile", "oss://test/input/cromwell_app.txt");
  59. appJobDescription.addOutputs("outputFile", "oss://test/output/ret/");
  60. AppJobDescription.Logging logging = new AppJobDescription.Logging();
  61. logging.setStderrPath("oss://test/output/error/");
  62. logging.setStdoutPath("oss://test/output/log/");
  63. appJobDescription.setLogging(logging);
  64. appJobDescription.addConfig("ResourceType", "OnDemand");
  65. appJobDescription.addConfig("InstanceType", "ecs.sn2ne.large");
  66. appJobDescription.addConfig("InstanceCount", 1);
  67. appJobDescription.addConfig("MinDiskSize", 40);
  68. appJobDescription.addConfig("DiskType", "cloud_efficiency");
  69. appJobDescription.addConfig("MaxRetryCount", 1);
  70. appJobDescription.addConfig("Timeout", 1000);
  71. appJobDescription.addConfig("ReserveOnFail", true);
  72. appJobDescription.addConfig("ClassicNetwork", false);//设置集群网络方式,false为VPC组网
  73. appJobDescription.addConfig("MinDataDiskSize", 40);
  74. //注意磁盘类型和 DiskType 保持一致
  75. appJobDescription.addConfig("DataDiskType", "cloud_efficiency");
  76. //挂载点根据需要做修改,windows 为 “E:, F:, G: 等”
  77. appJobDescription.addConfig("DataDiskMountPoint", "/home/mount/");
  78. return appJobDescription;
  79. }
  80. private static DAG getDagDesc() {
  81. DAG dag = new DAG();
  82. TaskDescription task = new TaskDescription();
  83. task.setTaskName("javaSdkTask");
  84. //设置实例信息
  85. task.setInstanceCount(1);
  86. if (IS_AUTO_CLUSTER){
  87. //设置Auto cluster
  88. task.setAutoCluster(getAutoCluster());
  89. }else{
  90. //设置固定集群信息
  91. task.setClusterId(ClusterId);
  92. }
  93. task.setMaxRetryCount(2);
  94. task.setTimeout(10000);
  95. Parameters parameters = new Parameters();
  96. Command cmd = new Command();
  97. //设置程序启动命令
  98. cmd.setCommandLine("python runtask.py 顿雳意当更冁");
  99. //设置程序启动脚本或者执行文件地址
  100. cmd.setPackagePath("oss://yuanhyyshenzhen/test/installpackage/runtask.tar.gz");
  101. //docker 镜像设置方式:推荐使用容器镜像模式
  102. //1、镜像在oss registry上,设置docker的方式
  103. //oss registry模式 参数设置好后自行打开注释
  104. //cmd.addEnvVars("BATCH_COMPUTE_DOCKER_IMAGE", "localhost:5000/yuorBucket/dockers:0.1");//镜像名称;
  105. //cmd.addEnvVars("BATCH_COMPUTE_DOCKER_REGISTRY_OSS_PATH", "oss://your-bucket/dockers");//设置OSS地址
  106. //2、镜像在容器镜像仓库,设置docker方式
  107. //Command.Docker docker = new Command.Docker();
  108. //docker.setImage("registry.cn-beijing.aliyuncs.com/demotest/test:0.1");
  109. //cmd.setDocker(docker);
  110. parameters.setCommand(cmd);
  111. //设置标准输出 上传的OSS路径
  112. parameters.setStderrRedirectPath("oss://test/output/error/");
  113. parameters.setStdoutRedirectPath("oss://test/output/log/");
  114. InputMappingConfig input = new InputMappingConfig();
  115. input.setLocale("GBK");
  116. input.setLock(true);
  117. parameters.setInputMappingConfig(input);
  118. task.setParameters(parameters);
  119. //设置输入OSS路径和本地路径关系
  120. task.addInputMapping("oss://test/input/", "/home/admin/disk1/");
  121. //设置输出本地路径和OSS地址
  122. task.addOutputMapping("/home/admin/disk2/", "oss://test/output/ret/");
  123. dag.addTask(task);
  124. return dag;
  125. }
  126. private static AutoCluster getAutoCluster() {
  127. AutoCluster autoCluster = new AutoCluster();
  128. //设置集群镜像信息ECSImageId 在不同region可能会发生变化
  129. //autoCluster.setECSImageId("m-wz9dk5nao5z3fw6bo9k6");
  130. //建议使用setImageId接口设置
  131. autoCluster.setImageId("img-ubuntu");
  132. autoCluster.setInstanceType("ecs.s3.large");
  133. autoCluster.setReserveOnFail(true);
  134. //设置资源类型只有ResourceType为Spot的情况下后面两项有效
  135. autoCluster.setResourceType("OnDemand");
  136. //autoCluster.setSpotPriceLimit(5.6f);
  137. //autoCluster.setSpotStrategy("Spot");
  138. //设置config信息
  139. autoCluster.setConfigs(getConfigDesc());
  140. return autoCluster;
  141. }
  142. private static Configs getConfigDesc() {
  143. Configs configs = new Configs();
  144. //设置系统磁盘类型以及大型
  145. Disks disks = new Disks();
  146. SystemDisk systemDisk = new SystemDisk();
  147. systemDisk.setSize(40);//GB
  148. systemDisk.setType("cloud_efficiency");
  149. disks.setSystemDisk(systemDisk);
  150. DataDisk dataDisk = new DataDisk();
  151. dataDisk.setMountPoint("/home/dataDisk/");
  152. dataDisk.setSize(40);
  153. dataDisk.setType("cloud_efficiency");
  154. disks.setDataDisk(dataDisk);
  155. configs.setDisks(disks);
  156. //设置网络类型
  157. Networks networks = new Networks();
  158. VPC vpc = new VPC();
  159. vpc.setCidrBlock("10.0.0.0/12");
  160. networks.setVpc(vpc);
  161. configs.setNetworks(networks);
  162. //设置挂载信息
  163. Mounts mounts = new Mounts();
  164. MountEntry mountEntry = new MountEntry();
  165. mountEntry.setDestination("/home/mount");
  166. //此处若是OSS地址则必须设置ossConfig,否则设置nasConfig
  167. mountEntry.setSource("oss://test/mount/");
  168. mountEntry.setWriteSupport(true);
  169. mounts.setCacheSupport(false);
  170. mounts.setLocale("GBK");
  171. mounts.setLock(true);
  172. mounts.addEntries(mountEntry);
  173. OSSConfig ossConfig = new OSSConfig();
  174. ossConfig.setAccessKeyId("xxx");
  175. ossConfig.setAccessKeySecret("xxx");
  176. ossConfig.setSecurityToken("xxx");
  177. mounts.setOss(ossConfig);
  178. NASConfig nasConfig = new NASConfig();
  179. nasConfig.addAccessGroup("xxx");
  180. nasConfig.addFileSystem("xxx");
  181. mounts.setNas(nasConfig);
  182. //configs.setMounts(mounts);
  183. return configs;
  184. }
  185. }
  1. 执行结果:
  2. ```JSON
  3. {
  4. jobId: job-000000005BE3E897000007FA00114EE9
  5. RequestId: null
  6. StatusCode: 201
  7. }

注意

  1. 本实例代码支持提交 APP 和 DAG 类型作业,支持 AutoCluster 和固定集群类型的作业,提交作业之前根据 业务需要修改开关(IS_DAG_JOBIS_AUTO_CLUSTER)即可。
  2. 若是提交 APP 类型作业,需要在提交作业之前 创建 APP,然后根据 APP 的创建参数做对应修改作业参数,最后进行作业提交。
  3. 提交固定集群作业之前需要先创建集群,修改 ClusterId 为新创建的集群,然后提交作业。
  4. 提交作业前 请确保 OSS 地址填写正确并且已经上传输入或者执行文件到对应的 OSS 路径