MapReduce模型是Map模型的扩展,新增Reduce接口,需要实现MapReduceJobProcessor。

背景信息

  • MapReduce模型只有一个Reduce,所有子任务完成后会执行Reduce方法,可以在Reduce方法中返回该任务示例的执行结果,作为工作流的上下游数据传递。如果有子任务失败,Reduce不会执行。Reduce失败,整个任务示例也失败。
  • MapReduce模型还能处理所有子任务的结果。子任务通过return ProcessResult(true, result)返回结果(例如返回订单号),Reduce的时候,可以通过context获取所有子任务的结果,进行相应的处理。

MapReduce模型的原理和最佳实践,请参见SchedulerX 2.0 分布式计算原理和最佳实践

SchedulerX 2.0支持MapReduce模型的详细信息,请参见SchedulerX 2.0 支持 MapReduce 模型

注意事项

  • 所有子任务结果会缓存在Master节点,内存压力较大,建议子任务个数和Result不要太大。
  • SchedulerX不保证子任务绝对执行一次,在特殊条件下会Failover,可能会导致子任务重复执行,需要业务方自己实现幂等。

接口

接口 解释 是否必选
public ProcessResult process(JobContext context) throws Exception; 每个子任务执行业务的入口,需要从context里获取taskName,自己判断是哪个子任务,进行相应的逻辑处理。执行完成后,需要返回ProcessResult
public ProcessResult map(List<? extends Object> taskList, String taskName); 执行map方法可以把一批子任务分布式到多台机器上执行,可以map多次。如果taskList是空,返回失败。执行完成后,需要返回ProcessResult
public ProcessResult reduce(JobContext context);
public void kill(JobContext context); 前端kill任务会触发该方法,需要用户自己实现如何中断业务。

执行方式

  • 并行计算:最多支持300任务,有子任务列表。
    注意 秒级别任务不要选择并行计算。
  • 内存网格:基于内存计算,最多支持50,000以下子任务,速度快。
  • 网格计算:基于文件计算,最多支持1,000,000子任务。

高级配置

任务管理高级配置参数说明如下:

参数 适用的执行模式 解释 默认值
实例失败重试次数 通用 任务运行失败自动重试的次数。 0
实例失败重试间隔 通用 每次失败重试的间隔。单位:秒。 30
实例并发数 通用 同一个Job同一时间运行的实例个数。1表示不允许重复执行。 1
子任务单机并发数
  • 并行计算
  • 内存网格
  • 网格计算
分布式模型,单台机器并发消费子任务的个数。 5
子任务失败重试次数
  • 并行计算
  • 内存网格
  • 网格计算
分布式模型,子任务失败自动重试的次数。 0
子任务失败重试间隔
  • 并行计算
  • 内存网格
  • 网格计算
分布式模型,子任务失败自动重试的间隔。单位:秒。 0
子任务分发方式
  • 并行计算
  • 内存网格
  • 网格计算
  • 推模型:每台机器平均分配子任务。
  • 拉模型: 每台机器主动拉取子任务,没有木桶效应。拉取过程中,所有子任务会缓存在Master节点,对内存有压力,建议子任务数不超过10,000。
推模型
子任务单次拉取数(仅适用于拉模型)
  • 并行计算
  • 内存网格
  • 网格计算
Slave节点每次向Master节点拉取多少个子任务。 5
子任务队列容量(仅适用于拉模型)
  • 并行计算
  • 内存网格
  • 网格计算
Slave节点缓存子任务的队列大小。 10
子任务全局并发数(仅适用于拉模型)
  • 并行计算
  • 内存网格
  • 网格计算
分布式拉模型支持全局子任务并发数,可以进行限流。 1,000

发送500条消息的Demo示例(适用于MapReduce模型)

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum=500;
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            System.out.println(task);
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }

}           

处理单表数据的Demo示例(适用于Map或MapReduce模型)

@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Service
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    static class PageTask {
        private long startId;
        private long endId;

        public PageTask(long startId, long endId) {
            this.startId = startId;
            this.endId = endId;
        }

        public long getStartId() {
            return startId;
        }

        public long getEndId() {
            return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); //多个Job后端代码可以一致,通过控制台配置Job参数表示表名。
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
            for (long i = minId; i < maxId; i+=step) {
                tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}            

处理分库分表数据的Demo示例(适用于Map或MapReduce模型)

@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {

    @Service
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    static class PageTask {
        private String tableName;
        private long startId;
        private long endId;

        public PageTask(String tableName, long startId, long endId) {
            this.tableName = tableName;
            this.startId = startId;
            this.endId = endId;
        }

        public String getTableName() {
            return tableName;
        }

        public long getStartId() {
            return startId;
        }

        public long getEndId() {
            return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            //先分库
            List<String> dbList = getDbList();
            return map(dbList, "DbTask");
        } else if (taskName.equals("DbTask")) {
            //根据分库去分表
            String dbName = (String)task;
            List<String> tableList = getTableList(dbName);
            return map(tableList, "TableTask");
        } else if (taskName.equals("TableTask")) {
            //如果一个分表也很大,再分页
            String tableName = (String)task;
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
            for (long i = minId; i < maxId; i+=step) {
                tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            String tableName = pageTask.getTableName();
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<String> getDbList() {
        List<String> dbList = Lists.newArrayList();
        //TODO 返回分库列表
        return dbList;
    }

    private List<String> getTableList(String dbName) {
        List<String> tableList = Lists.newArrayList();
        //TODO 返回分表列表
        return tableList;
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}            

处理50条消息并且返回子任务结果由Reduce汇总的Demo示例(适用于MapReduce模型)

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 50;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
        }
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }
}