Java调度任务可以在您的应用进程中执行,也可以通过上传JAR包来动态加载。
执行模式
Java任务类型支持单机、广播、并行计算、内存网格、网格计算和分片运行6种执行模式:
- 单机:在同一个
groupId
下的机器随机挑一台执行。 - 广播:同一个
groupId
下的所有机器同时执行。 - 并行计算:支持子任务300以下,有子任务列表。
- 内存网格:基于内存计算,子任务50,000以下,速度快。
- 网格计算:基于文件计算,子任务1,000,000以下。
- 分片运行:包括静态分片和动态分批,用于处理大数据业务需求。
单机和广播需要实现JavaProcessor;并行计算、内存网格、网格计算和分片运行需要实现MapJobProcessor。
Processor类路径,即实现类的全路径名,例如com.apache.armon.test.schedulerx.processor.MySimpleJob
:
- 如果不上传JAR包,SchedulerX会去您的应用进程中的classpath下查找processor实现类,所以每次修改需要重新编译和发布。
- 如果上传了JAR包,每次会热加载JAR包和processor,不需要重新发布应用。
编程模型
Java任务支持两种编程模型:JavaProcessor和MapJobProcessor。
- JavaProcessor
- 可选:
public void preProcess(JobContext context) throws Exception
- 必需:
public ProcessResult process(JobContext context) throws Exception
- 可选:
public void postProcess(JobContext context)
- 可选:
public void kill(JobContext context)
- 可选:
- MapJobProcessor
- 必需:
public ProcessResult process(JobContext context) throws Exception
- 可选:
public void postProcess(JobContext context)
- 必需:
public void kill(JobContext context)
- 可选:
public void kill(JobContext context)
- 必需:
public ProcessResult map(List<? extends Object> taskList, String taskName)
- 必需:
ProcessResult
每个process需要返回ProcessResult,用来表示任务执行的状态、结果和错误信息。
- 任务运行成功:
return new ProcessResult(true)
。 - 任务运行失败:
return new ProcessResult(false, ErrorMsg)
或者直接抛异常。 - 任务运行成功并且返回结果:
return new ProcessResult(true, result)
。result
是一个字符串,不能大于1000字节。
HelloSchedulerx2.0任务示例
@Component
public class MyProcessor1 extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
//TODO
System.out.println("Hello, schedulerx2.0!");
return new ProcessResult(true);
}
}
支持Kill功能的任务示例
@Component
public class MyProcessor2 extends JavaProcessor {
private volatile boolean stop = false;
@Override
public ProcessResult process(JobContext context) throws Exception {
int N = 10000;
while (!stop && N >= 0) {
//TODO
N--;
}
return new ProcessResult(true);
}
@Override
public void kill(JobContext context) {
stop = true;
}
@Override
public void preProcess(JobContext context) {
stop = false; //如果是通过Spring启动,Bean是单例,需要通过preProcess把标记为复位
}
}
通过Map模型批量处理任务示例
/**
* 对一张单表进行分布式批量处理
* 1. 根任务先查询一张表,获取minId,maxId
* 2. 构造PageTask,通过map进行分发
* 3. 下一级获取到如果是PageTask,则进行数据处理
*
*/
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
private static final int pageSize = 100;
static class PageTask {
private int startId;
private int endId;
public PageTask(int startId, int endId) {
this.startId = startId;
this.endId = endId;
}
public int getStartId() {
return startId;
}
public int getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
System.out.println("start root task");
Pair<Integer, Integer> idPair = queryMinAndMaxId();
int minId = idPair.getFirst();
int maxId = idPair.getSecond();
List<PageTask> taskList = Lists.newArrayList();
int step = (int) ((maxId - minId) / pageSize); //计算分页数量for (int i = minId; i < maxId; i+=step) {
taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(taskList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
PageTask record = (PageTask)task;
long startId = record.getStartId();
long endId = record.getEndId();
//TODO
return new ProcessResult(true);
}
return new ProcessResult(true);
}
@Override
public void postProcess(JobContext context) {
//TODO
System.out.println("all tasks is finished.");
}
private Pair<Integer, Integer> queryMinAndMaxId() {
//TODO select min(id),max(id) from xxx
return null;
}
}
在文档使用中是否遇到以下问题
更多建议
匿名提交