Java jobs
You can run Java jobs in your application processes. This topic describes how to manage Java jobs.
Execution modes
Java jobs support the following execution modes:
Standalone: The job is executed on a random machine in the same
groupId.Broadcast: The job is executed concurrently on all machines in the same
groupId.Visual MapReduce: This is a MapReduce task model that requires the Professional Edition. It supports up to 1,000 subtasks. You can query detailed execution records, operational logs, and subtask stacks using a business keyword.
MapReduce: This is a regular MapReduce model task that supports parallel processing of many subtasks. You can query only the summary information about subtask execution. Select this mode when the number of subtasks is less than one million.
Sharded execution: This mode uses static sharding and dynamic batching to process big data.
Implement JavaProcessor for Standalone and Broadcast modes. Implement MapJobProcessor for Visual MapReduce, MapReduce, and Sharded execution modes.
The processor class path is the fully qualified name of the implementation class. For example, com.apache.armon.test.schedulerx.processor.MySimpleJob.
If you do not upload a JAR package, SchedulerX searches for the processor implementation class in your application's classpath. Therefore, you must recompile and publish the application after each modification.
If you upload a JAR package, SchedulerX hot-reloads the JAR package and the processor. You do not need to republish the application.
Programming models
Java jobs support two programming models: JavaProcessor and MapJobProcessor.
JavaProcessor
Optional:
public void preProcess(JobContext context) throws ExceptionRequired:
public ProcessResult process(JobContext context) throws ExceptionOptional:
public void postProcess(JobContext context)Optional:
public void kill(JobContext context)
MapJobProcessor
Required:
public ProcessResult process(JobContext context) throws ExceptionOptional:
public void postProcess(JobContext context)Optional:
public void kill(JobContext context)Required:
public ProcessResult map(List<? extends Object> taskList, String taskName)
ProcessResult
Each process must return a ProcessResult object. This object indicates the task's execution status, result, and error messages.
The task runs successfully and returns
new ProcessResult(true).A task fails if it returns
new ProcessResult(false, ErrorMsg)or throws an exception.A successful task execution returns
return new ProcessResult(true, result), whereresultis a string that cannot exceed 1000 bytes.
HelloSchedulerx2.0 job example
@Component
public class MyProcessor1 extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
//TODO
System.out.println("Hello, schedulerx2.0!");
return new ProcessResult(true);
}
} Job example that supports the kill feature
@Component
public class MyProcessor2 extends JavaProcessor {
private volatile boolean stop = false;
@Override
public ProcessResult process(JobContext context) throws Exception {
int N = 10000;
while (!stop && N >= 0) {
//TODO
N--;
}
return new ProcessResult(true);
}
@Override
public void kill(JobContext context) {
stop = true;
}
@Override
public void preProcess(JobContext context) {
stop = false; // If the job is launched using Spring and the bean is a singleton, use preProcess to reset the flag.
}
} Job example for batch processing using the Map model
/**
* Distributed batch processing for a non-partitioned table.
* 1. The root task queries a table to get minId and maxId.
* 2. Construct PageTask and distribute it using the map method.
* 3. If the next level receives a PageTask, process the data.
*
*/
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
private static final int pageSize = 100;
static class PageTask {
private int startId;
private int endId;
public PageTask(int startId, int endId) {
this.startId = startId;
this.endId = endId;
}
public int getStartId() {
return startId;
}
public int getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
System.out.println("start root task");
Pair<Integer, Integer> idPair = queryMinAndMaxId();
int minId = idPair.getFirst();
int maxId = idPair.getSecond();
List<PageTask> taskList = Lists.newArrayList();
int step = (int) ((maxId - minId) / pageSize); // Calculate the number of pages.
for (int i = minId; i < maxId; i+=step) {
taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(taskList, "Level1Dispatch"); // The process calls the map method to distribute subtasks.
} else if (taskName.equals("Level1Dispatch")) {
PageTask record = (PageTask)task;
long startId = record.getStartId();
long endId = record.getEndId();
//TODO
return new ProcessResult(true);
}
return new ProcessResult(true);
}
@Override
public void postProcess(JobContext context) {
//TODO
System.out.println("All tasks are finished.");
}
private Pair<Integer, Integer> queryMinAndMaxId() {
//TODO select min(id),max(id) from xxx
return null;
}
}