全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
批量计算

创建(提交)作业

更新时间:2017-10-27 10:19:36

createJob

方法说明:

创建并提交一个作业。

参数说明:

参数 类型 是否必需 说明
jobDescription JobDescription Job对象中有各个任务的描述信息,和任务的DAG依赖。

下面的 JobDescription ,TaskDag,TaskDescription以及ResourceDescription的包名都是: com.aliyuncs.batchcompute.pojo.v20151111

JobDescription的属性(都有getter和setter方法)

属性 类型 是否必需 说明
name String 作业名称。
priority int 优先级用一个[0,1000]范围内的整数指定。数值越高表示作业调度时的优先级越高
description String 作业的简短描述信息。
dag DAG type为DAG时必选,Job对象中有各个任务的描述信息,以及各个任务之间的DAG依赖。
app AppJobDescription type为App时必选,描述App配置参数,输入输出等信息。
jobFailOnInstanceFail boolean 表示Instance失败会否导致Job失败,一般取值为true。
autoRelease boolean 表示job运行成功自动会被立即释放(删除)掉。默认为false。
type String 作业类型,目前仅支持有向无环图(directed acycline graph,DAG)形式描述任务。取值: “DAG”,”App”。
notification Notification 消息通知配置,可以配置MNS服务的Topic和Job相关事件.

DAG 的属性(都有getter和setter方法)

属性 类型 是否必需 说明
tasks Map<String, TaskDescription> 所有任务名与任务描述的映射信息。
dependencies Map<String, List<String>> 任务之间的依赖关系, 也是一个map,其中key为 源任务名称,value为 目标任务名称 列表,可以描述一对多的顺序关系 (详见示例)。

TaskDescription的属性(都有getter和setter方法)

属性 类型 是否必需 描述
taskName String 任务名称。
clusterId String 集群ID。该参数和autoCluster(匿名集群)参数只能选一个。
autoCluster AutoCluster 匿名集群,该参数和集群ID只能选一个。
parameters Paramters 程序运行相关配置
inputMapping Map<String,String> 用来描述OSS上路径同本地挂载位置的映射关系。 所有挂载位置,必须以’/’结尾,可以位于相同的bucket,也可以位于不同的bucket,但是必须归属同一个用户。挂载位置在windows下是一个盘符,在linux下是一个文件夹。 key为oss路径,value为挂载路径,如:”D:”,或“/home/admin/dist1”
outMapping Map<String,String> 本地路径对OSS映射,Key为本地路径,value为OSS路径。
logMapping Map<String,String> 本地日志路径对OSS映射,Key为本地路径,value为OSS路径。
instanceCount int 任务中实例的个数,正数。
timeout int 设置任务中的一个实例的最长执行时间(超时时间),范围为[1,86400],单位为秒。
mounts Mounts 实例的网络挂载配置信息,由Mounts描述,目前支持NAS和OSS挂载。
maxRetryCount int 最大重试次数,默认为0。
writeSupport boolean inputMapping 只读挂载目录中可写支持。注意:已经挂载的文件为只读,不可覆盖。

AutoCluster的属性(都有getter和setter方法)

属性 类型 是否必需 描述
ECSImageId String 支持ECS镜像ID和注册的镜像ID。
instanceType String 实例类型。不同region支持的实例类型不一样。
resourceType String 资源类型,目前只支持且默认为:”OnDemand”。
SpotStrategy String 实例的竞价策略,只有在ResourceType为Spot的情况下有效。取值范围: SpotWithPriceLimit:设置上限价格的竞价实例;SpotAsPriceGo:系统自动出价,最高不超过按量付费价格。
SpotPriceLimit float 实例的每小时最高价格。支持最大 3 位小数,SpotStrategy 为 SpotWithPriceLimit 生效。
userData Map<String, String> 用户数据
configs Configs 集群的一些配置信息,比如实例的磁盘配置,由Configs描述。

Parameters的属性(都有getter和setter方法)

属性 类型 是否必需 描述
command Command 用户程序相关命令行参数。
inputMappingConfig InputMappingConfig NFS挂载服务配置项。
stdoutRedirectPath String 标准输出将被上传到这里, 应该是一个oss路径。
stderrRedirectPath String 标准错误将被上传到这里, 应该是一个oss路径。

Command 的属性(都有getter和setter方法)

属性 类型 是否必需 描述
commandLine String 执行用户程序的命令。
PackagePath String 用户程序所在OSS路径。
EnvVars Map<String,String> 用户程序开始执行时,需要设置的环境变量。

InputMappingConfig的属性(都有getter和setter方法)

属性 类型 是否必需 描述
lock Boolean 布尔型变量,用来确定NFS挂载服务是否支持网络文件锁。如果设置为true,则会开启网络锁服务,为文件锁提供后端支持。 如果设置为false,则会关闭文件锁服务,可以提高文件访问的性能。但是文件锁的支持仅在NFS客户端,不会同步到服务端,对于有些操作系统,显示的文件锁方法可能会失败。
locale String OSS上的Object统一采用UTF-8编码命名,这个参数可以决定挂载后使用的本地字符集。 可选范围包括GBK、GB2312-80、BIG5、ANSI、EUC-JP、EUC-TW、EUC-KR、SHIFT-JIS、KSC5601等。

Notification的属性(都有getter和setter方法)

属性 类型 是否必需 描述
topic Topic 消息Topic

Topic 的属性(都有getter和setter方法)

属性 类型 是否必需 描述
endpoint String MNS区域endpoint,格式如: http://${your_user_id}.mns.${region}-internal.aliyuncs.com/ ,请尽量使用内网Endpoint。
name String Topic名称。
events List<String> 事件列表,请填写job相关的事件。

AppJobDescription 的属性

属性 类型 是否必需 描述
appName String app name。允许的字符集为[a-zA-Z0-9_-:],长度不能大于1024。App名Name有两种格式,分别用于访问公共的App和自己私有的App。比如,要访问公共App App1,则可以设置为Public:App1,其中的Public可以省略,变为:App1。如果App1是自己的私有App,则直接设置为App1。
inputs Map<String,Object> 作业输入参数。Map大小不能大于50,不能小于0。Key应为App中InputParameters定义的参数名,Value应当与App中参数定义的类型相同。
outputs Map<String,Object> 作业输出参数。Map大小不能大于50,不能小于0。Key应为App中OutputParameters定义的LocalPath不为空的参数名,Value应当为某个OSS路径。默认值:{}
logging AppJobDescription.Loggin 日志配置
config Map<String,Object> 作业运行时配置

AppJobDescription.Loggin 的属性

参数名称 类型 必选 描述
stdoutPath string 标准输出重定向到OSS的目录。长度不能大于1000。默认值:空
stderrPath string 标准错误重定向到OSS的目录。长度不能大于1000。默认值:空

返回值说明:

  • 创建成功后返回一个CreateJobResponse实例,可以通过 response.getJobId() 获取创建的作业ID。
类型 说明
CreateJobResponse 可以获取创建成功的jobId。

CreateJobResponse的包名为:com.aliyuncs.batchcompute.model.v20151111,下面的其他Response都是在这个包下面。

  • 创建失败后,抛出异常: ClientException。

提交DAG作业代码示例:

JobDescription结构比较复杂,强烈建议使用IDE开发,推荐 IntelliJ IDEA Community版。

  1. package com.aliyuncs.batchcompute.sample.v20151111;
  2. import com.aliyuncs.batchcompute.main.v20151111.*;
  3. import com.aliyuncs.batchcompute.model.v20151111.*;
  4. import com.aliyuncs.batchcompute.pojo.v20151111.*;
  5. import com.aliyuncs.exceptions.ClientException;
  6. public class CreateJob {
  7. public static void main(String[] args) {
  8. BatchCompute client = new BatchComputeClient("cn-shenzhen", "your_access_id", "your_access_secret");
  9. try {
  10. JobDescription jobDescription = getJobDesc();
  11. CreateJobResponse response = client.createJob(jobDescription);
  12. String jobId = response.getJobId();
  13. //创建成功
  14. System.out.println("Got job id:" + jobId);
  15. } catch (ClientException e) {
  16. e.printStackTrace();
  17. //创建失败
  18. }
  19. }
  20. private static JobDescription getJobDesc() {
  21. JobDescription desc = new JobDescription();
  22. desc.setName("testJob");
  23. desc.setPriority(1);
  24. desc.setDescription("Demo");
  25. desc.setType("DAG");
  26. desc.setJobFailOnInstanceFail(true);
  27. desc.setAutoRelease(false);
  28. DAG dag = new DAG();
  29. dag.addTask(getTaskDesc());
  30. desc.setDag(dag);
  31. Notification noti = new Notification();
  32. Topic topic = new Topic();
  33. topic.addEvent(Topic.ON_JOB_FAILED);
  34. topic.addEvent(Topic.ON_JOB_FINISHED);
  35. noti.setTopic(topic);
  36. topic.setName("tp_n1");
  37. topic.setEndpoint("xxxxx");
  38. desc.setNotification(noti);
  39. return desc;
  40. }
  41. private static TaskDescription getTaskDesc() {
  42. TaskDescription task = new TaskDescription();
  43. task.setClusterId("cls-xxxxxx");
  44. task.setInstanceCount(1);
  45. task.setMaxRetryCount(2);
  46. task.setTaskName("task_1");
  47. task.setTimeout(10000);
  48. Parameters parameters = new Parameters();
  49. Command cmd = new Command();
  50. cmd.setCommandLine("python main.py");
  51. cmd.setPackagePath("oss://my-bucket/test/worker.tar.gz");
  52. parameters.setCommand(cmd);
  53. parameters.setStderrRedirectPath("oss://my-bucket/test/logs/");
  54. parameters.setStdoutRedirectPath("oss://my-bucket/test/logs/");
  55. InputMappingConfig input = new InputMappingConfig();
  56. input.setLocale("GBK");
  57. input.setLock(true);
  58. parameters.setInputMappingConfig(input);
  59. task.setParameters(parameters);
  60. task.addInputMapping("oss://my-bucket/disk1/", "/home/admin/disk1/");
  61. task.addOutputMapping("/home/admin/disk2/", "oss://my-bucket/disk2/");
  62. return task;
  63. }
  64. }

提交App作业代码示例:

  1. package com.aliyuncs.batchcompute;
  2. import com.aliyuncs.batchcompute.main.v20151111.BatchCompute;
  3. import com.aliyuncs.batchcompute.main.v20151111.BatchComputeClient;
  4. import com.aliyuncs.batchcompute.model.v20151111.CreateJobResponse;
  5. import com.aliyuncs.batchcompute.pojo.v20151111.AppJobDescription;
  6. import com.aliyuncs.batchcompute.pojo.v20151111.JobDescription;
  7. import com.aliyuncs.exceptions.ClientException;
  8. public class CreateAppJob {
  9. public static void main(String[] args){
  10. BatchCompute client = new BatchComputeClient("cn-shenzhen", "your_access_id", "your_access_secret");
  11. try {
  12. JobDescription jobDescription = getJobDesc();
  13. CreateJobResponse response = client.createJob(jobDescription);
  14. String jobId = response.getJobId();
  15. //创建成功
  16. System.out.println("Got job id:" + jobId);
  17. } catch (ClientException e) {
  18. e.printStackTrace();
  19. //创建失败
  20. }
  21. }
  22. private static JobDescription getJobDesc() {
  23. JobDescription jobDesc = new JobDescription();
  24. AppJobDescription app = new AppJobDescription();
  25. app.setAppName("myApp");
  26. app.addInputs("abc", "oss://bucket/inp");
  27. app.addOutputs("out", "oss://bucket/out");
  28. AppJobDescription.Logging logging = new AppJobDescription.Logging();
  29. logging.setStderrPath("oss://bucket/key/logs");
  30. logging.setStdoutPath("oss://bucket/key/logs");
  31. app.setLogging(logging);
  32. jobDesc.setName("java-sdk-test");
  33. jobDesc.setDescription("java sdk test job");
  34. jobDesc.setType("App");
  35. jobDesc.setApp(app);
  36. return jobDesc;
  37. }
  38. }
本文导读目录