广播

广播执行表示一个任务实例会广播到该分组所有Worker上执行,当所有Worker都执行完成,该任务才算完成。任意一台Worker执行失败,都算该任务失败。

应用场景

  • 批量运维
    • 定时广播所有机器运行某个脚本。
    • 定时广播所有机器清理缓存。
    • 动态拉起每台机器的某个服务,最后由一台机器回收结果修改数据库。
  • 数据聚合
    • 使用JavaProcessor,preProcess的时候初始化Redis缓存或者数据库。
    • 每台机器执行process的时候,根据自己业务返回result。
    • postProcess的时候,获取所有机器的执行结果做汇总,更新缓存或者数据库。

任务类型

任务类型可以选择多种,例如脚本或者Java任务。如果选择Java,还支持preProcess和postProcess高级特性。

使用Java任务需要继承JavaProcessor(1.0.7及以上版本),接口如下:

  • 必需:public ProcessResult process(JobContext context) throws Exception
  • 可选:public void preProcess(JobContext context)
  • 可选:public ProcessResult postProcess(JobContext context)

preProcess会在所有机器执行process之前执行,且只会执行一次。

postProcess会在所有机器执行process且都成功执行之后执行一次,可以返回结果,作为工作流数据传输。

Demo示例

@Component
public class TestBroadcastJob extends JavaProcessor {

    /**
     * 只有一台机器会执行
     */
    @Override
    public void preProcess(JobContext context) {
        System.out.println("TestBroadcastJob.preProcess");
    }

    /**
     * 所有机器会执行
     */
    @Override
    public ProcessResult process(JobContext context) throws Exception {
        int value = new Random().nextInt(10);
        System.out.println("分片总数=" + context.getShardingNum() + ", 分片号=" + context.getShardingId() + ", "
                + "taskId=" + context.getTaskId() + ", value=" + value);
        return new ProcessResult(true, String.valueOf(value));
    }

    /**
     * 只有一台机器会执行
     */
    @Override
    public ProcessResult postProcess(JobContext context) {
        System.out.println("TestBroadcastJob.postProcess");
        Map<Long, String> allTaskResults = context.getTaskResults();
        Map<Long, TaskStatus> allTaskStatuses = context.getTaskStatuses();
        int num = 0;
        for (Entry<Long, String> entry : allTaskResults.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
            if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
                num += Integer.valueOf(entry.getValue());
            }
        }
        System.out.println("TestBroadcastJob.postProcess(), num=" + num);
        return new ProcessResult(true, String.valueOf(num));
    }

}