可视化MapReduce模型

更新时间: 2023-08-22 16:07:06

可视化MapReduce模型在MapReduce模型基础上,新增了可视化可运维的能力。您无需修改后端代码,只需在SchedulerX控制台将分布式模型改为可视化MapReduce,即可新增一个子任务列表页面,并且可以查看每个子任务的详情、结果和日志及支持每个子任务级别的重跑。

注意事项

  • 仅专业版支持。

  • 子任务个数不能超过1000个。

  • 单个子任务的大小不能超过64 KB。

  • ProcessResult的result返回值不能超过1000 Byte。

  • 如果使用reduce,所有子任务结果会缓存在Master节点,该情况对Master节点内存压力较大,建议子任务个数和result返回值不要太大。如果没有reduce需求,使用MapJobProcessor接口即可。

  • SchedulerX不保证子任务绝对执行一次。在特殊条件下会failover,可能导致子任务重复执行,需要业务方自行实现幂等。

接口

  • 继承MapReduce模型所有接口。具体信息,请参见MapReduce模型

  • 在MapReduce模型接口基础上,支持设置每个子任务的标签(子任务对象需要实现com.alibaba.schedulerx.worker.processor.BizSubTask接口)。

    接口

    解释

    是否必选

    public Map<String, String> labelMap()

    实现输出子任务标签信息。

任务开发演示

创建子任务对象并实现接口com.alibaba.schedulerx.worker.processor.BizSubTask,展开查看代码:

public class ParallelAccountInfo implements BizSubTask {

    /**
     * 主键
     */
    private long id;

    private String name;

    private String accountId;

    public ParallelAccountInfo(long id, String name, String accountId) {
        this.id = id;
        this.name = name;
        this.accountId = accountId;
    }

    /**
     * 实现labelMap方法,用于设置对应子任务的标签信息
     * @return
     */
    @Override
    public Map<String, String> labelMap() {
        Map<String, String> labelMap = new HashMap();
        labelMap.put("户名", name);
        return labelMap;
    }
}

开发对应任务处理Processor,继承com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor,展开查看代码:

public class ParallelJob extends MapReduceJobProcessor {

    private static final Logger logger = LoggerFactory.getLogger("schedulerx");

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true);
    }

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        if(isRootTask(context)){
            logger.info("构建并行计算的子任务列表...");
            List<ParallelAccountInfo> list = new LinkedList();
            /**
             *  判断如果是rootTask的情况下,构建并行计算子任务对象列表
             *  在实际业务场景中,用户可自行根据业务场景加载子任务对象且该业务对象实现BizSubTask接口
             *  场景案例:
             *  1、从数据库中加载未被处理的客户账户信息
             *  2、构建省份城市地区信息列表,按区域分发任务处理
             *  3、根据业务标签作为子任务分类,如:电器、日用品、食品等
             *  4、可根据时间作为子任务分类,如:按月(1月、2月...)
             */
            for(int i=0; i < 20; i++){
                list.add(new ParallelAccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"),
                        "AC"+StringUtils.leftPad(i+"", 12, "0")));
            }
            return map(list, "transfer");
        }else {
            /**
             * 非rootTask,用户可以获取对应的子任务信息进行相应的业务处理
             */
            ParallelAccountInfo obj = (ParallelAccountInfo)context.getTask();
            // 针对获取的 obj子任务信息,进行业务逻辑处理
            // do something
            logger.info("处理子任务信息:{}", JSON.toJSONString(obj));
            return new ProcessResult(true);
        }
    }
}

操作步骤

任务配置

  1. 登录分布式任务调度平台,在左侧导航栏,单击任务管理

  2. 任务管理页面,单击创建任务

  3. 创建任务面板,执行模式下拉列表选择可视化MapReduce

    image.png

可视化能力

任务执行后,您可以在执行列表页面,单击详情查看对应子任务的详细执行信息。

  • 子任务列表页签查看每个业务对象处理的状态。

    image.png
  • 子任务列表页签,单击子任务操作列的日志,查看每个子任务运行的业务日志信息,分析执行状态结果。

    image.png
  • 任务执行记录在运行中时,在当前执行详情页签,单击查看堆栈,查看对应机器处理线程运行中的情况,分析当前任务运行异常情况。

    image.png

相关文档

可视化MapReduce模型运用解析

MapReduce模型

阿里云首页 分布式任务调度 SchedulerX 相关技术圈