MapReduce模型

更新时间:

MapReduce模型是SchedulerX自主研发的轻量级分布式跑批模型。通过MapJobProcessor或MapReduceJobProcessor接口将接入的Worker组成分布式计算引擎进行大数据跑批。相对于传统的大数据跑批(例如Hadoop、Spark等),MapReduce无需将数据导入大数据平台,且无额外存储及计算成本,即可实现秒级别海量数据处理,具有成本低、速度快、编程简单等特性。

注意事项

  • 单个子任务的大小不能超过64 KB。

  • ProcessResult的result返回值不能超过1000 Byte。

  • 如果使用reduce,所有子任务结果会缓存在Master节点,该情况对Master节点内存压力较大,建议子任务个数和result返回值不要太大。如果没有reduce需求,使用MapJobProcessor接口即可。

  • SchedulerX不保证子任务绝对执行一次。在特殊条件下会failover,可能导致子任务重复执行,需要业务方自行实现幂等。

接口

  • 继承类MapJobProcessor

    接口

    解释

    是否必选

    public ProcessResult process(JobContext context) throws Exception;

    每个子任务执行业务的入口,需从context中获取taskName,您需自行判断子任务名称。逻辑处理完成后,返回ProcessResult

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    执行map方法可以将一批子任务分布至多台机器上执行,可以多次执行map方法。如果taskList为空,返回失败。执行完成后,返回ProcessResult

    public void kill(JobContext context);

    前端kill任务会触发该方法,需自行实现如何中断业务。

  • 继承类MapReduceJobProcessor

    接口

    解释

    是否必选

    public ProcessResult process(JobContext context) throws Exception;

    每个子任务执行业务的入口,需从context里获取taskName,您需自行判断子任务名称。逻辑处理完成后,返回ProcessResult

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    执行map方法可以将一批子任务分布至多台机器上执行,可以多次执行map方法。如果taskList是空,返回失败。执行完成后,返回ProcessResult

    public ProcessResult reduce(JobContext context);

    所有Worker节点的子任务执行完成后,会回调reduce方法。reduce由Master节点执行,一般用于数据聚合或通知下游,也可以用于工作流的上下游数据传递。

    reduce方法能处理所有子任务的结果。

    1. 子任务通过return ProcessResult(true, result)返回结果(例如返回订单号)。

    2. 执行reduce方法时,通过context获取所有子任务的状态(context.getTaskStatuses())和结果(context.getTaskResults()),并进行相应的逻辑处理。

    public void kill(JobContext context);

    前端kill任务会触发该方法,需自行实现如何中断业务。

    public boolean runReduceIfFail(JobContext context)

    当存在子任务失败情况下,是否执行reduce方法。默认配置为:子任务失败时,仍然执行reduce方法。

操作步骤

  1. 登录分布式任务调度平台,在左侧导航栏,单击任务管理

  2. 任务管理页面,单击创建任务

  3. 创建任务面板,执行模式下拉列表选择MapReduce,在高级配置区域配置相关信息。

    配置项

    说明

    分发策略

    说明

    需客户端版本>=1.10.3。

    • 轮询策略(默认):每个Worker平均分配等量子任务,适用于每个子任务处理耗时基本一致的场景。

    • WorkerLoad最优策略:由主节点自动感知Worker节点的负载情况,适用于子任务和Worker机器处理耗时有较大差异的场景。

    子任务单机并发数

    即单机执行线程数,默认为5。如需加快执行速度,可以调大该值。如果下游或者数据库无法承接,可适当调小。

    子任务失败重试次数

    子任务失败会自动重试,默认为0。

    子任务失败重试间隔

    子任务失败重试间隔,单位:秒,默认为0。

    子任务failover策略

    说明

    需客户端版本>=1.8.12。

    当执行节点宕机下线后,是否将子任务重新分发给其他机器执行。开启该配置后,发生failover时,子任务可能会重复执行,需自行做好幂等。

    主节点参与执行

    说明

    需客户端版本>=1.8.12。

    主节点是否参与子任务执行。在线可运行Worker数量必须不低于2台,在子任务数量特别大时,推荐关闭该参数。

    子任务分发方式

    • 推模型:每台机器平均分配子任务。

    • 拉模型:每台机器主动拉取子任务,没有木桶效应,支持动态扩容拉取子任务。拉取过程中,所有子任务会缓存在Master节点,对内存有压力,建议子任务数不超过10,000。

    其他配置项,请参见任务管理高级配置参数说明

原理&最佳实践

Schedulerx2.0分布式计算原理&最佳实践

Demo

处理单表数据(单表ID连续)

  1. 主任务读取最小ID和最大ID。

    select min(id), max(id) from Tab1;
  2. 根据ID的range进行分页,每个task包含两个字段:startId和endId。

  3. 每个task通过ID的range获取数据。

    select * from Tab1 where id >= startId and id < endId;

以下为示例代码:

class PageTask {
    private long startId;
    private long endId;

    public PageTask(long startId, long endId) {
        this.startId = startId;
        this.endId = endId;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); //多个job后端代码可以一致,通过控制台配置job参数表示表名
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

处理单表数据(单表ID不连续)

  1. 数据库采用分桶策略,增加一个bucket字段作为索引。

  2. 例如1024个桶,数据库每添加一行记录时,将订单号或者ID进行hash,例如订单号%1024,落在bucket字段中。

  3. 基本上每个桶是平均的,针对每个桶,都可以通过以下SQL语句全量查询结果。

    select * from Tab1 where bucket=xxx;

以下为示例代码:

@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); //多个job后端代码可以一致,通过控制台配置job参数表示表名。
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            List<Integer> tasks = Lists.newArrayList();
            for (int i = 0; i< 1024; i++) {
                tasks.add(i);
            }    
            return map(tasks, "BucketTask");
        } else if (taskName.equals("BucketTask")) {
            int bucketId = (int)task;
            List<Record> records = queryRecord(tableName, bucketId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<Record> queryRecord(String tableName, int bucketId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from #{tableName} where bucket= #{bucketId}
        return records;
    }

}

处理分库分表数据

class PageTask {
    private String tableName;
    private long startId;
    private long endId;

    public PageTask(String tableName, long startId, long endId) {
        this.tableName = tableName;
        this.startId = startId;
        this.endId = endId;
    }

    public String getTableName() {
        return tableName;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            //先分库
            List<String> dbList = getDbList();
            return map(dbList, "DbTask");
        } else if (taskName.equals("DbTask")) {
            //根据分库去分表
            String dbName = (String)task;
            List<String> tableList = getTableList(dbName);
            return map(tableList, "TableTask");
        } else if (taskName.equals("TableTask")) {
            //如果一个分表也很大,再分页
            String tableName = (String)task;
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(tableName, minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            String tableName = pageTask.getTableName();
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<String> getDbList() {
        List<String> dbList = Lists.newArrayList();
        //TODO 返回分库列表
        return dbList;
    }

    private List<String> getTableList(String dbName) {
        List<String> tableList = Lists.newArrayList();
        //TODO 返回分表列表
        return tableList;
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

处理50条消息,reduce返回结果

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum=50;
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            System.out.println(task);
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }

}

处理50条消息并且返回子任务结果由reduce汇总

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 50;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
        }
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }
}