注意事项
单个子任务的大小不能超过64 KB。
ProcessResult的result返回值不能超过1000 Byte。
如果使用reduce,所有子任务结果会缓存在Master节点,该情况对Master节点内存压力较大,建议子任务个数和result返回值不要太大。如果没有reduce需求,使用MapJobProcessor接口即可。
SchedulerX不保证子任务绝对执行一次。在特殊条件下会failover,可能导致子任务重复执行,需要业务方自行实现幂等。
接口
继承类MapJobProcessor
接口 | 解释 | 是否必选 |
public ProcessResult process(JobContext context) throws Exception;
| 每个子任务执行业务的入口,需从context中获取taskName,您需自行判断子任务名称。逻辑处理完成后,返回ProcessResult。 | 是 |
public ProcessResult map(List<? extends Object> taskList, String taskName);
| 执行map方法可以将一批子任务分布至多台机器上执行,可以多次执行map方法。如果taskList为空,返回失败。执行完成后,返回ProcessResult。 | 是 |
public void kill(JobContext context);
| 前端kill任务会触发该方法,需自行实现如何中断业务。 | 否 |
继承类MapReduceJobProcessor
接口 | 解释 | 是否必选 |
public ProcessResult process(JobContext context) throws Exception;
| 每个子任务执行业务的入口,需从context里获取taskName,您需自行判断子任务名称。逻辑处理完成后,返回ProcessResult。 | 是 |
public ProcessResult map(List<? extends Object> taskList, String taskName);
| 执行map方法可以将一批子任务分布至多台机器上执行,可以多次执行map方法。如果taskList是空,返回失败。执行完成后,返回ProcessResult。 | 是 |
public ProcessResult reduce(JobContext context);
| 所有Worker节点的子任务执行完成后,会回调reduce方法。reduce由Master节点执行,一般用于数据聚合或通知下游,也可以用于工作流的上下游数据传递。 reduce方法能处理所有子任务的结果。 子任务通过return ProcessResult(true, result) 返回结果(例如返回订单号)。 执行reduce方法时,通过context获取所有子任务的状态(context.getTaskStatuses() )和结果(context.getTaskResults() ),并进行相应的逻辑处理。
| 是 |
public void kill(JobContext context);
| 前端kill任务会触发该方法,需自行实现如何中断业务。 | 否 |
public boolean runReduceIfFail(JobContext context)
| 当存在子任务失败情况下,是否执行reduce方法。默认配置为:子任务失败时,仍然执行reduce方法。 | 否 |
操作步骤
登录分布式任务调度平台,在左侧导航栏,单击任务管理。
在任务管理页面,单击创建任务。
在创建任务面板,执行模式下拉列表选择MapReduce,在高级配置区域配置相关信息。
配置项 | 说明 |
分发策略 | |
子任务单机并发数 | 即单机执行线程数,默认为5。如需加快执行速度,可以调大该值。如果下游或者数据库无法承接,可适当调小。 |
子任务失败重试次数 | 子任务失败会自动重试,默认为0。 |
子任务失败重试间隔 | 子任务失败重试间隔,单位:秒,默认为0。 |
子任务failover策略 | 当执行节点宕机下线后,是否将子任务重新分发给其他机器执行。开启该配置后,发生failover时,子任务可能会重复执行,需自行做好幂等。 |
主节点参与执行 | 主节点是否参与子任务执行。在线可运行Worker数量必须不低于2台,在子任务数量特别大时,推荐关闭该参数。 |
子任务分发方式 | |
其他配置项,请参见任务管理高级配置参数说明。
Demo
处理单表数据(单表ID连续)
主任务读取最小ID和最大ID。
select min(id), max(id) from Tab1;
根据ID的range进行分页,每个task包含两个字段:startId和endId。
每个task通过ID的range获取数据。
select * from Tab1 where id >= startId and id < endId;
以下为示例代码:
class PageTask {
private long startId;
private long endId;
public PageTask(long startId, long endId) {
this.startId = startId;
this.endId = endId;
}
public long getStartId() {
return startId;
}
public long getEndId() {
return endId;
}
}
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
@Autowired
private XXXService xxxService;
private final int PAGE_SIZE = 500;
@Override
public ProcessResult process(JobContext context) throws Exception {
String tableName = context.getJobParameters();
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
long minId = idPair.getFirst();
long maxId = idPair.getSecond();
List<PageTask> tasks = Lists.newArrayList();
int step = (int) ((maxId - minId) / PAGE_SIZE);
if (step > 0) {
for (long i = minId; i < maxId; i+=step) {
tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
} else {
tasks.add(new PageTask(minId, maxId));
}
return map(tasks, "PageTask");
} else if (taskName.equals("PageTask")) {
PageTask pageTask = (PageTask)task;
long startId = pageTask.getStartId();
long endId = pageTask.getEndId();
List<Record> records = queryRecord(tableName, startId, endId);
return new ProcessResult(true);
}
return new ProcessResult(false);
}
private Pair<Long, Long> queryMinAndMaxId(String tableName) {
return new Pair<Long, Long>(1L, 10000L);
}
private List<Record> queryRecord(String tableName, long startId, long endId) {
List<Record> records = Lists.newArrayList();
return records;
}
}
处理单表数据(单表ID不连续)
数据库采用分桶策略,增加一个bucket字段作为索引。
例如1024个桶,数据库每添加一行记录时,将订单号或者ID进行hash,例如订单号%1024,落在bucket字段中。
基本上每个桶是平均的,针对每个桶,都可以通过以下SQL语句全量查询结果。
select * from Tab1 where bucket=xxx;
以下为示例代码:
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
@Autowired
private XXXService xxxService;
private final int PAGE_SIZE = 500;
@Override
public ProcessResult process(JobContext context) throws Exception {
String tableName = context.getJobParameters();
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
List<Integer> tasks = Lists.newArrayList();
for (int i = 0; i< 1024; i++) {
tasks.add(i);
}
return map(tasks, "BucketTask");
} else if (taskName.equals("BucketTask")) {
int bucketId = (int)task;
List<Record> records = queryRecord(tableName, bucketId);
return new ProcessResult(true);
}
return new ProcessResult(false);
}
private List<Record> queryRecord(String tableName, int bucketId) {
List<Record> records = Lists.newArrayList();
return records;
}
}
处理分库分表数据
class PageTask {
private String tableName;
private long startId;
private long endId;
public PageTask(String tableName, long startId, long endId) {
this.tableName = tableName;
this.startId = startId;
this.endId = endId;
}
public String getTableName() {
return tableName;
}
public long getStartId() {
return startId;
}
public long getEndId() {
return endId;
}
}
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {
@Autowired
private XXXService xxxService;
private final int PAGE_SIZE = 500;
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
List<String> dbList = getDbList();
return map(dbList, "DbTask");
} else if (taskName.equals("DbTask")) {
String dbName = (String)task;
List<String> tableList = getTableList(dbName);
return map(tableList, "TableTask");
} else if (taskName.equals("TableTask")) {
String tableName = (String)task;
Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
long minId = idPair.getFirst();
long maxId = idPair.getSecond();
List<PageTask> tasks = Lists.newArrayList();
int step = (int) ((maxId - minId) / PAGE_SIZE);
if (step > 0) {
for (long i = minId; i < maxId; i+=step) {
tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
}
} else {
tasks.add(new PageTask(tableName, minId, maxId));
}
return map(tasks, "PageTask");
} else if (taskName.equals("PageTask")) {
PageTask pageTask = (PageTask)task;
String tableName = pageTask.getTableName();
long startId = pageTask.getStartId();
long endId = pageTask.getEndId();
List<Record> records = queryRecord(tableName, startId, endId);
return new ProcessResult(true);
}
return new ProcessResult(false);
}
private List<String> getDbList() {
List<String> dbList = Lists.newArrayList();
return dbList;
}
private List<String> getTableList(String dbName) {
List<String> tableList = Lists.newArrayList();
return tableList;
}
private Pair<Long, Long> queryMinAndMaxId(String tableName) {
return new Pair<Long, Long>(1L, 10000L);
}
private List<Record> queryRecord(String tableName, long startId, long endId) {
List<Record> records = Lists.newArrayList();
return records;
}
}
处理50条消息,reduce返回结果
@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
int dispatchNum=50;
if (isRootTask(context)) {
System.out.println("start root task");
List<String> msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
String task = (String)context.getTask();
System.out.println(task);
return new ProcessResult(true);
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
}
}
处理50条消息并且返回子任务结果由reduce汇总
@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
int dispatchNum = 50;
if (context.getJobParameters() != null) {
dispatchNum = Integer.valueOf(context.getJobParameters());
}
if (isRootTask(context)) {
System.out.println("start root task");
List<String> msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
String task = (String)context.getTask();
Thread.sleep(2000);
return new ProcessResult(true, task);
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
}
return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
}
}