本文档将介绍如何使用 Java 版 SDK 来提交一个作业,目的是统计一个日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数。
步骤
作业准备
上传数据文件到 OSS
使用示例代码
编译打包
上传到 OSS
使用 SDK 创建(提交)作业
查看结果
1. 作业准备
本作业是统计一个日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数。
该作业包含 3 个任务:split,count 和 merge:
split 任务会把日志文件分成 3 份。
count 任务会统计每份日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数(count 任务需要配置 InstanceCount 为 3,表示同时启动 3 台机器运行 count 程序)。
merge 任务会把 count 任务的结果统一合并起来。
DAG图例:
(1) 上传数据文件到OSS
下载本例所需的数据:log-count-data.txt
将 log-count-data.txt 上传到:oss://your-bucket/log-count/log-count-data.txt
your-bucket 表示您自己创建的 bucket,本例假设 region 为:cn-shenzhen.
(2) 使用示例代码
本示例将采用 Java 来编写作业任务,使用 Maven 来编译,推荐使用 IDEA:https://www.jetbrains.com/idea/download/ 选择 Community 版本(免费)。
示例程序下载:java-log-count.zip
这是一个 Maven 工程。
无需修改代码。
(3) 编译打包
运行命令编译打包:
mvn package
即可在 target 得到下面 3 个 jar 包:
batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar
再将 3 个 jar 包,打成一个 tar.gz 压缩包,命令如下:
> cd target #进入 target 目录
> tar -czf worker.tar.gz *SNAPSHOT-*.jar #打包
运行以下命令,查看包的内容是否正确:
> tar -tvf worker.tar.gz
batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar
batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar
BatchCompute 只支持以 tar.gz 为后缀的压缩包,请注意务必用以上方式(gzip)打包,否则将会无法解析。
(4) 上传到OSS
本例将 worker.tar.gz 上传到 OSS 的 your-bucket 中:oss://your-bucket/log-count/worker.tar.gz
如要运行本例子,您需要创建自己的 bucket,并且把 worker.tar.gz 文件上传至您自己创建的 bucket 路径下。
2. 使用SDK创建(提交)作业
(1) 新建一个Maven工程
在 pom.xml 中增加以下 dependencies:
<dependencies>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-batchcompute</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>3.2.3</version>
</dependency>
</dependencies>
请确定使用最新版本的 SDK:Java 版 SDK
(2) 新建一个Java类: Demo.java
提交作业需要指定集群 ID 或者使用匿名集群参数。本例子使用匿名集群方式进行。匿名集群需要配置2个参数,其中:
可用的镜像 ID,可以使用系统提供的 Image,也可以自行制作镜像。
实例规格(InstanceType,实例类型),请看 目前支持类型 。
在 OSS 中创建存储 StdoutRedirectPath(程序输出结果)和 StderrRedirectPath(错误日志)的文件路径,本例中创建的路径为oss://your-bucket/log-count/logs/
如需运行本例,请按照上文所述的变量获取以及与上文对应的您的 OSS 路径对程序中注释中的变量进行修改。
Java SDK 提交程序模板如下,程序中具体参数含义请参照SDK接口说明。
Demo.java:
/*
* IMAGE_ID:ECS 镜像,由上文所述获取
* INSTANCE_TYPE: 实例类型,由上文所述获取
* REGION_ID:提交作业的地域,此项需与上文 OSS 存储 worker 的bucket 地域一致
* ACCESS_KEY_ID: AccessKeyId 可以由上文所述获取
* ACCESS_KEY_SECRET: AccessKeySecret 可以由上文所述获取
* WORKER_PATH:由上文所述打包上传的 worker 的 OSS 存储路径
* LOG_PATH:错误反馈和 task 输出的存储路径,logs 文件需事先自行创建
*/
import com.aliyuncs.batchcompute.main.v20151111.*;
import com.aliyuncs.batchcompute.model.v20151111.*;
import com.aliyuncs.batchcompute.pojo.v20151111.*;
import com.aliyuncs.exceptions.ClientException;
import java.util.ArrayList;
import java.util.List;
public class Demo {
static String IMAGE_ID = "img-ubuntu";; //这里填写您的 ECS 镜像 ID
static String INSTANCE_TYPE = "ecs.sn1.medium"; //根据 region 填写合适的 InstanceType
static String REGION_ID = "cn-shenzhen"; //这里填写 region
static String ACCESS_KEY_ID = ""; //"your-AccessKeyId"; 这里填写您的 AccessKeyId
static String ACCESS_KEY_SECRET = ""; //"your-AccessKeySecret"; 这里填写您的 AccessKeySecret
static String WORKER_PATH = ""; //"oss://your-bucket/log-count/worker.tar.gz"; // 这里填写您上传的 worker.tar.gz 的 OSS 存储路径
static String LOG_PATH = ""; // "oss://your-bucket/log-count/logs/"; // 这里填写您创建的错误反馈和 task 输出的 OSS 存储路径
static String MOUNT_PATH = ""; // "oss://your-bucket/log-count/";
public static void main(String[] args){
/** 构造 BatchCompute 客户端 */
BatchCompute client = new BatchComputeClient(REGION_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
try{
/** 构造 Job 对象 */
JobDescription jobDescription = genJobDescription();
//创建 Job
CreateJobResponse response = client.createJob(jobDescription);
//创建成功后,返回 jobId
String jobId = response.getJobId();
System.out.println("Job created success, got jobId: "+jobId);
//查询 job 状态
GetJobResponse getJobResponse = client.getJob(jobId);
Job job = getJobResponse.getJob();
System.out.println("Job state:"+job.getState());
} catch (ClientException e) {
e.printStackTrace();
System.out.println("Job created failed, errorCode:"+ e.getErrCode()+", errorMessage:"+e.getErrMsg());
}
}
private static JobDescription genJobDescription(){
JobDescription jobDescription = new JobDescription();
jobDescription.setName("java-log-count");
jobDescription.setPriority(0);
jobDescription.setDescription("log-count demo");
jobDescription.setJobFailOnInstanceFail(true);
jobDescription.setType("DAG");
DAG taskDag = new DAG();
/** 添加 split task */
TaskDescription splitTask = genTaskDescription();
splitTask.setTaskName("split");
splitTask.setInstanceCount(1);
splitTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Split.jar");
taskDag.addTask(splitTask);
/** 添加 count task */
TaskDescription countTask = genTaskDescription();
countTask.setTaskName("count");
countTask.setInstanceCount(3);
countTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Count.jar");
taskDag.addTask(countTask);
/** 添加 merge task */
TaskDescription mergeTask = genTaskDescription();
mergeTask.setTaskName("merge");
mergeTask.setInstanceCount(1);
mergeTask.getParameters().getCommand().setCommandLine("java -jar batchcompute-job-log-count-1.0-SNAPSHOT-Merge.jar");
taskDag.addTask(mergeTask);
/** 添加 Task 依赖: split-->count-->merge */
List<String> taskNameTargets = new ArrayList();
taskNameTargets.add("merge");
taskDag.addDependencies("count", taskNameTargets);
List<String> taskNameTargets2 = new ArrayList();
taskNameTargets2.add("count");
taskDag.addDependencies("split", taskNameTargets2);
//dag
jobDescription.setDag(taskDag);
return jobDescription;
}
private static TaskDescription genTaskDescription(){
AutoCluster autoCluster = new AutoCluster();
autoCluster.setInstanceType(INSTANCE_TYPE);
autoCluster.setImageId(IMAGE_ID);
//autoCluster.setResourceType("OnDemand");
TaskDescription task = new TaskDescription();
//task.setTaskName("Find");
//如果使用 VPC,需要配置 cidrBlock, 请确保 IP 段不冲突
Configs configs = new Configs();
Networks networks = new Networks();
VPC vpc = new VPC();
vpc.setCidrBlock("192.168.0.0/16");
networks.setVpc(vpc);
configs.setNetworks(networks);
autoCluster.setConfigs(configs);
//打包上传的作业的 OSS 全路径
Parameters p = new Parameters();
Command cmd = new Command();
//cmd.setCommandLine("");
//打包上传的作业的 OSS 全路径
cmd.setPackagePath(WORKER_PATH);
p.setCommand(cmd);
//错误反馈存储路径
p.setStderrRedirectPath(LOG_PATH);
//最终结果输出存储路径
p.setStdoutRedirectPath(LOG_PATH);
task.setParameters(p);
task.addInputMapping(MOUNT_PATH, "/home/input");
task.addOutputMapping("/home/output",MOUNT_PATH);
task.setAutoCluster(autoCluster);
//task.setClusterId(clusterId);
task.setTimeout(30000); /* 30000 秒*/
task.setInstanceCount(1); /** 使用 1 个实例来运行 */
return task;
}
}
正常输出样例:
Job created success, got jobId: job-01010100010192397211
Job state:Waiting
3. 查看作业状态
您可以用 SDK 中的 获取作业信息方法获取作业状态:
//查询 job 状态
GetJobResponse getJobResponse = client.getJob(jobId);
Job job = getJobResponse.getJob();
System.out.println("Job state:"+job.getState());
Job 的 state 可能为:Waiting、Running、Finished、Failed、Stopped.
4. 查看结果
您可以登录 batchcompute 控制台 查看 job 状态。
Job 运行结束,您可以登录 OSS 控制台 查看your-bucket 这个 bucket 下面的这个文件:/log-count/merge_result.json。
内容应该如下:
{"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}
您也可以使用OSS 的 SDK来获取结果。