全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 钉钉智能硬件
批量计算

Java快速开始

更新时间:2017-11-13 16:03:35

Java快速开始例子

本文档将介绍如何使用 Java 版 SDK 来提交一个作业,目的是统计一个日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数。

如果您还没开通批量计算服务,请先开通

步骤预览

  • 作业准备
    • 上传数据文件到OSS
    • 使用示例代码
    • 编译打包
    • 上传到OSS
  • 使用SDK创建(提交)作业
  • 查看结果

1. 作业准备

本作业是统计一个日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数。

该作业包含3个任务: split, count 和 merge:

  • split 任务会把日志文件分成 3 份。
  • count 任务会统计每份日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数 (count 任务需要配置InstanceCount为3,表示同时启动3台机器运行个 count 程序)。
  • merge 任务会把 count 任务的结果统一合并起来。

DAG图例:

DAG图例

(1) 上传数据文件到OSS

下载本例子所需的数据: log-count-data.txt

将 log-count-data.txt 上传到:

oss://your-bucket/log-count/log-count-data.txt

  • your-bucket表示您自己创建的bucket,本例子假设region为: cn-shenzhen.
  • 如何上传到OSS,请参考OSS上传文档

(2) 使用示例代码

这里我们将采用Java来编写作业任务,使用maven来编译,推荐使用IDEA:http://www.jetbrains.com/idea/download/ 选择Community版本(免费).

示例程序下载:java-log-count.zip

这是一个maven工程。

  • 注意:无需修改代码。

(3) 编译打包

运行命令编译打包:

  1. mvn package

即可在target得到下面3个jar包:

  1. batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
  2. batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
  3. batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar

再将3个jar包,打成一个tar.gz压缩包,命令如下:

  1. > cd target #进入target目录
  2. > tar -czf worker.tar.gz *SNAPSHOT-*.jar #打包

运行以下命令,查看包的内容是否正确:

  1. > tar -tvf worker.tar.gz
  2. batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
  3. batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
  4. batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar
  • 注意:BatchCompute 只支持以 tar.gz 为后缀的压缩包, 请注意务必用以上方式(gzip)打包, 否则将会无法解析。

(4) 上传到OSS

本例将 worke.tar.gz 上传到 OSS 的 your-bucket 中:

oss://your-bucket/log-count/worker.tar.gz

  • 如要运行本例子,您需要创建自己的bucket,并且把worker.tar.gz文件上传至您自己创建的bucket的路径下。

2. 使用SDK创建(提交)作业

(1) 新建一个maven工程

在pom.xml中增加以下dependencies:

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.aliyun</groupId>
  4. <artifactId>aliyun-java-sdk-batchcompute</artifactId>
  5. <version>3.1.0</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>com.aliyun</groupId>
  9. <artifactId>aliyun-java-sdk-core</artifactId>
  10. <version>3.0.3</version>
  11. </dependency>
  12. </dependencies>

(2) 新建一个java类: Demo.java

提交作业需要指定集群ID或者使用匿名集群参数。本例子使用匿名集群方式进行。匿名集群需要配置2个参数, 其中:

  • 可用的镜像ID, 可以使用系统提供的Image,也可以自行制作镜像, 请看使用镜像
  • 实例规格(InstanceType,实例类型),请看 目前支持类型

在 OSS 中创建存储StdoutRedirectPath(程序输出结果)和StderrRedirectPath(错误日志)的文件路径,本例中创建的路径为

oss://your-bucket/log-count/logs/

  • 如需运行本例子,请按照上文所述的变量获取以及与上文对应的您的OSS路径对程序中注释中的变量进行修改。

Java SDK 提交程序模板如下,程序中具体参数含义请参照 SDK接口说明

Demo.java:

  1. /*
  2. * IMAGE_ID:ECS镜像,由上文所述获取
  3. * INSTANCE_TYPE: 实例类型,由上文所述获取
  4. * REGION_ID:区域为青岛/杭州,目前只有青岛开通,此项需与上文OSS存储worker的bucket地域一致
  5. * ACCESS_KEY_ID: AccessKeyId可以由上文所述获取
  6. * ACCESS_KEY_SECRET: AccessKeySecret可以由上文所述获取
  7. * WORKER_PATH:由上文所述打包上传的worker的OSS存储路径
  8. * LOG_PATH:错误反馈和task输出的存储路径,logs文件需事先自行创建
  9. */
  10. import com.aliyuncs.batchcompute.main.v20151111.*;
  11. import com.aliyuncs.batchcompute.model.v20151111.*;
  12. import com.aliyuncs.batchcompute.pojo.v20151111.*;
  13. import com.aliyuncs.exceptions.ClientException;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. public class Demo {
  17. static String IMAGE_ID = "img-ubuntu";; //这里填写您的 ECS 镜像ID
  18. static String INSTANCE_TYPE = "ecs.sn1.medium"; //根据region填写合适的InstanceType
  19. static String REGION_ID = "cn-shenzhen"; //这里填写region
  20. static String ACCESS_KEY_ID = ""; //"your-AccessKeyId"; 这里填写您的AccessKeyId
  21. static String ACCESS_KEY_SECRET = ""; //"your-AccessKeySecret"; 这里填写您的AccessKeySecret
  22. static String WORKER_PATH = ""; //"oss://your-bucket/log-count/worker.tar.gz"; // 这里填写您上传的worker.tar.gz的OSS存储路径
  23. static String LOG_PATH = ""; // "oss://your-bucket/log-count/logs/"; // 这里填写您创建的错误反馈和task输出的OSS存储路径
  24. static String MOUNT_PATH = ""; // "oss://your-bucket/log-count/";
  25. public static void main(String[] args){
  26. /** 构造 BatchCompute 客户端 */
  27. BatchCompute client = new BatchComputeClient(REGION_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
  28. try{
  29. /** 构造 Job 对象 */
  30. JobDescription jobDescription = genJobDescription();
  31. //创建Job
  32. CreateJobResponse response = client.createJob(jobDescription);
  33. //创建成功后,返回jobId
  34. String jobId = response.getJobId();
  35. System.out.println("Job created success, got jobId: "+jobId);
  36. //查询job状态
  37. GetJobResponse getJobResponse = client.getJob(jobId);
  38. Job job = getJobResponse.getJob();
  39. System.out.println("Job state:"+job.getState());
  40. } catch (ClientException e) {
  41. e.printStackTrace();
  42. System.out.println("Job created failed, errorCode:"+ e.getErrCode()+", errorMessage:"+e.getErrMsg());
  43. }
  44. }
  45. private static JobDescription genJobDescription(){
  46. JobDescription jobDescription = new JobDescription();
  47. jobDescription.setName("java-log-count");
  48. jobDescription.setPriority(0);
  49. jobDescription.setDescription("log-count demo");
  50. jobDescription.setJobFailOnInstanceFail(true);
  51. jobDescription.setType("DAG");
  52. DAG taskDag = new DAG();
  53. /** 添加 split task */
  54. TaskDescription splitTask = genTaskDescription();
  55. splitTask.setTaskName("split");
  56. splitTask.setInstanceCount(1);
  57. splitTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar");
  58. taskDag.addTask(splitTask);
  59. /** 添加 count task */
  60. TaskDescription countTask = genTaskDescription();
  61. countTask.setTaskName("count");
  62. countTask.setInstanceCount(3);
  63. countTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar");
  64. taskDag.addTask(countTask);
  65. /** 添加 merge task */
  66. TaskDescription mergeTask = genTaskDescription();
  67. mergeTask.setTaskName("merge");
  68. mergeTask.setInstanceCount(1);
  69. mergeTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar");
  70. taskDag.addTask(mergeTask);
  71. /** 添加Task依赖: split-->count-->merge */
  72. List<String> taskNameTargets = new ArrayList();
  73. taskNameTargets.add("merge");
  74. taskDag.addDependencies("count", taskNameTargets);
  75. List<String> taskNameTargets2 = new ArrayList();
  76. taskNameTargets2.add("count");
  77. taskDag.addDependencies("split", taskNameTargets2);
  78. //dag
  79. jobDescription.setDag(taskDag);
  80. return jobDescription;
  81. }
  82. private static TaskDescription genTaskDescription(){
  83. AutoCluster autoCluster = new AutoCluster();
  84. autoCluster.setInstanceType(INSTANCE_TYPE);
  85. autoCluster.setImageId(IMAGE_ID);
  86. //autoCluster.setResourceType("OnDemand");
  87. TaskDescription task = new TaskDescription();
  88. //task.setTaskName("Find");
  89. //打包上传的作业的OSS全路径
  90. Parameters p = new Parameters();
  91. Command cmd = new Command();
  92. //cmd.setCommandLine("");
  93. //打包上传的作业的OSS全路径
  94. cmd.setPackagePath(WORKER_PATH);
  95. p.setCommand(cmd);
  96. //错误反馈存储路径
  97. p.setStderrRedirectPath(LOG_PATH);
  98. //最终结果输出存储路
  99. p.setStdoutRedirectPath(LOG_PATH);
  100. task.setParameters(p);
  101. task.addInputMapping(MOUNT_PATH, "/home/input");
  102. task.addOutputMapping("/home/output",MOUNT_PATH);
  103. task.setAutoCluster(autoCluster);
  104. //task.setClusterId(clusterId);
  105. task.setTimeout(30000); /* 30000 秒*/
  106. task.setInstanceCount(1); /** 使用1个实例来运行 */
  107. return task;
  108. }
  109. }

正常输出样例:

  1. Job created success, got jobId: job-01010100010192397211
  2. Job state:Waiting

3. 查看作业状态

您可以用SDK中的 获取作业信息 方法获取作业状态:

  1. //查询job状态
  2. GetJobResponse getJobResponse = client.getJob(jobId);
  3. Job job = getJobResponse.getJob();
  4. System.out.println("Job state:"+job.getState());

Job的state可能为:Waiting, Running, Finished, Failed, Stopped.

4. 查看结果

您可以登录batchcompute控制台查看job状态。

Job运行结束,您可以登录OSS控制台 查看your-bucket 这个bucket下面的这个文件:/log-count/merge_result.json。

内容应该如下:

  1. {"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}

您也可以使用OSS的SDK来获取结果。

本文导读目录