全部产品
云市场

分布式 Java 任务

更新时间:2019-07-15 11:14:30

分布式任务调度包含 Map 和 MapReduce 两种编程模型。

Map 模型

Map 模型基于 MapJobProcessor 调用 Map 方法即可实现大数据分布式跑批的能力。

接口

接口 解释 是否必选
ProcessResult process(JobContext context) 每个子任务执行业务的入口,需要从 context 里获取 taskName,自己判断是哪个子任务,进行相应的逻辑处理。执行完成后,需要返回 ProcessResult。ProcessResult 接口请参见 ProcessResult
ProcessResult map(List<? extends Object> taskList, String taskName) 执行 map 方法可以把一批子任务分布式到多台机器上执行,可以 map 多次。如果 taskList 是空,返回失败。执行完成后,需要返回 ProcessResult。ProcessResult 接口请参见 ProcessResult
void kill(JobContext context) 前端 kill 任务会触发该方法,需要用户自己实现如何中断业务。

MapReduce 模型

MapReduce 模型是 Map 模型的扩展,新增 Reduce接口,需要实现 MapReduceJobProcessor。

MapReduce 模型只有一个 Reduce,所有子任务完成后会执行 Reduce 方法,可以在 Reduce 方法中返回该任务示例的执行结果,作为工作流的上下游数据传递。如果有子任务失败,Reduce 不会执行。Reduce 失败,整个任务示例也失败。

接口

接口 解释 是否必选
ProcessResult process(JobContext context) 每个子任务执行业务的入口,需要从 context 里获取 taskName,自己判断是哪个子任务,进行相应的逻辑处理。执行完成后,需要返回 ProcessResult。ProcessResult 接口请参见 ProcessResult
ProcessResult map(List<? extends Object> taskList, String taskName) 执行 map 方法可以把一批子任务分布式到多台机器上执行,可以 map 多次。如果 taskList 是空,返回失败。执行完成后,需要返回 ProcessResult。ProcessResult 接口请参见 ProcessResult
void kill(JobContext context) 前端 kill 任务会触发该方法,需要用户自己实现如何中断业务。

执行方式

基于 Map 或 MapReduce 模型的分布式 Java 调度任务可以使用并行结算、内存网络和网格计算 3 种执行方式。

  • 并行计算:支持子任务 300 以下,有子任务列表。
  • 内存网格:基于内存计算,子任务 50,000 以下,速度快。
  • 网格计算:基于文件计算,子任务 1,000,000 以下。

说明:所有执行方式,task 的 body 不能超过 64KB,即调用 map 分发子任务时,taskList 里的每个元素不能超过 64KB。

高级配置

  • 单机子任务并发数(单机执行线程数):默认 5。可以根据实际业务量进行调整。
  • 子任务失败重试次数:默认 0。子任务失败会自动重试。
  • 子任务失败重试间隔:默认 0,单位秒。
  • 子任务分发方式
    • 推模型:每台机器平均分配子任务。
    • 拉模型:每台机器主动拉取子任务,没有木桶效应,支持动态扩容拉子任务。拉取过程中,所有子任务会缓存在 Master 节点,对内存有压力,建议子任务数不超过 10,000。
  • 子任务单次拉取数:拉模型高级配置,Slave 每次向 Master 拉多少个子任务,默认 5。
  • 子任务队列容量:拉模型高级配置,Slave 节点缓存子任务的队列大小。
  • 子任务全局并发数:分布式拉模型支持全局子任务并发数,可以进行限流,默认 1000。

发送 50 条消息的 Demo 示例(适用于 Map 模型)

  1. @Component
  2. public class TestMapJobProcessor extends MapJobProcessor {
  3. @Override
  4. public ProcessResult process(JobContext context) throws Exception {
  5. String taskName = context.getTaskName();
  6. int dispatchNum = 50;
  7. if (isRootTask(context)) {
  8. System.out.println("start root task");
  9. List<String> msgList = Lists.newArrayList();
  10. for (int i = 0; i <= dispatchNum; i++) {
  11. msgList.add("msg_" + i);
  12. }
  13. return map(msgList, "Level1Dispatch");
  14. } else if (taskName.equals("Level1Dispatch")) {
  15. String task = (String)context.getTask();
  16. System.out.println(task);
  17. return new ProcessResult(true);
  18. }
  19. return new ProcessResult(false);
  20. }
  21. }

发送 500 条消息的 Demo 示例(适用于 MapReduce 模型)

  1. @Component
  2. public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
  3. @Override
  4. public ProcessResult process(JobContext context) throws Exception {
  5. String taskName = context.getTaskName();
  6. int dispatchNum=500;
  7. if (isRootTask(context)) {
  8. System.out.println("start root task");
  9. List<String> msgList = Lists.newArrayList();
  10. for (int i = 0; i <= dispatchNum; i++) {
  11. msgList.add("msg_" + i);
  12. }
  13. return map(msgList, "Level1Dispatch");
  14. } else if (taskName.equals("Level1Dispatch")) {
  15. String task = (String)context.getTask();
  16. System.out.println(task);
  17. return new ProcessResult(true);
  18. }
  19. return new ProcessResult(false);
  20. }
  21. @Override
  22. public ProcessResult reduce(JobContext context) throws Exception {
  23. return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
  24. }
  25. }

处理单表数据的 Demo 示例(适用于 Map 或 MapReduce 模型)

  1. @Component
  2. public class ScanSingleTableJobProcessor extends MapJobProcessor {
  3. @Service
  4. private XXXService xxxService;
  5. private final int PAGE_SIZE = 500;
  6. static class PageTask {
  7. private long startId;
  8. private long endId;
  9. public PageTask(long startId, long endId) {
  10. this.startId = startId;
  11. this.endId = endId;
  12. }
  13. public long getStartId() {
  14. return startId;
  15. }
  16. public long getEndId() {
  17. return endId;
  18. }
  19. }
  20. @Override
  21. public ProcessResult process(JobContext context) throws Exception {
  22. String tableName = context.getJobParameters(); //多个 Job 后端代码可以一致,通过控制台配置 Job 参数表示表名。
  23. String taskName = context.getTaskName();
  24. Object task = context.getTask();
  25. if (isRootTask(context)) {
  26. Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
  27. long minId = idPair.getFirst();
  28. long maxId = idPair.getSecond();
  29. List<PageTask> tasks = Lists.newArrayList();
  30. int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
  31. for (long i = minId; i < maxId; i+=step) {
  32. tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
  33. }
  34. return map(tasks, "PageTask");
  35. } else if (taskName.equals("PageTask")) {
  36. PageTask pageTask = (PageTask)task;
  37. long startId = pageTask.getStartId();
  38. long endId = pageTask.getEndId();
  39. List<Record> records = queryRecord(tableName, startId, endId);
  40. //TODO handle records
  41. return new ProcessResult(true);
  42. }
  43. return new ProcessResult(false);
  44. }
  45. private Pair<Long, Long> queryMinAndMaxId(String tableName) {
  46. //TODO select min(id),max(id) from [tableName]
  47. return new Pair<Long, Long>(1L, 10000L);
  48. }
  49. private List<Record> queryRecord(String tableName, long startId, long endId) {
  50. List<Record> records = Lists.newArrayList();
  51. //TODO select * from [tableName] where id>=[startId] and id<[endId]
  52. return records;
  53. }
  54. }

处理分库分表数据的 Demo 示例(适用于 Map 或 MapReduce 模型)

  1. @Component
  2. public class ScanShardingTableJobProcessor extends MapJobProcessor {
  3. @Service
  4. private XXXService xxxService;
  5. private final int PAGE_SIZE = 500;
  6. static class PageTask {
  7. private String tableName;
  8. private long startId;
  9. private long endId;
  10. public PageTask(String tableName, long startId, long endId) {
  11. this.tableName = tableName;
  12. this.startId = startId;
  13. this.endId = endId;
  14. }
  15. public String getTableName() {
  16. return tableName;
  17. }
  18. public long getStartId() {
  19. return startId;
  20. }
  21. public long getEndId() {
  22. return endId;
  23. }
  24. }
  25. @Override
  26. public ProcessResult process(JobContext context) throws Exception {
  27. String taskName = context.getTaskName();
  28. Object task = context.getTask();
  29. if (isRootTask(context)) {
  30. //先分库
  31. List<String> dbList = getDbList();
  32. return map(dbList, "DbTask");
  33. } else if (taskName.equals("DbTask")) {
  34. //根据分库去分表
  35. String dbName = (String)task;
  36. List<String> tableList = getTableList(dbName);
  37. return map(tableList, "TableTask");
  38. } else if (taskName.equals("TableTask")) {
  39. //如果一个分表也很大,再分页
  40. String tableName = (String)task;
  41. Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
  42. long minId = idPair.getFirst();
  43. long maxId = idPair.getSecond();
  44. List<PageTask> tasks = Lists.newArrayList();
  45. int step = (int) ((maxId - minId) / PAGE_SIZE); //计算分页数量
  46. for (long i = minId; i < maxId; i+=step) {
  47. tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
  48. }
  49. return map(tasks, "PageTask");
  50. } else if (taskName.equals("PageTask")) {
  51. PageTask pageTask = (PageTask)task;
  52. String tableName = pageTask.getTableName();
  53. long startId = pageTask.getStartId();
  54. long endId = pageTask.getEndId();
  55. List<Record> records = queryRecord(tableName, startId, endId);
  56. //TODO handle records
  57. return new ProcessResult(true);
  58. }
  59. return new ProcessResult(false);
  60. }
  61. private List<String> getDbList() {
  62. List<String> dbList = Lists.newArrayList();
  63. //TODO 返回分库列表
  64. return dbList;
  65. }
  66. private List<String> getTableList(String dbName) {
  67. List<String> tableList = Lists.newArrayList();
  68. //TODO 返回分表列表
  69. return tableList;
  70. }
  71. private Pair<Long, Long> queryMinAndMaxId(String tableName) {
  72. //TODO select min(id),max(id) from [tableName]
  73. return new Pair<Long, Long>(1L, 10000L);
  74. }
  75. private List<Record> queryRecord(String tableName, long startId, long endId) {
  76. List<Record> records = Lists.newArrayList();
  77. //TODO select * from [tableName] where id>=[startId] and id<[endId]
  78. return records;
  79. }
  80. }

处理 50 条消息并且返回子任务结果由 Reduce 汇总的 Demo 示例(适用于 MapReduce 模型)

  1. @Component
  2. public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
  3. @Override
  4. public ProcessResult process(JobContext context) throws Exception {
  5. String taskName = context.getTaskName();
  6. int dispatchNum = 50;
  7. if (context.getJobParameters() != null) {
  8. dispatchNum = Integer.valueOf(context.getJobParameters());
  9. }
  10. if (isRootTask(context)) {
  11. System.out.println("start root task");
  12. List<String> msgList = Lists.newArrayList();
  13. for (int i = 0; i <= dispatchNum; i++) {
  14. msgList.add("msg_" + i);
  15. }
  16. return map(msgList, "Level1Dispatch");
  17. } else if (taskName.equals("Level1Dispatch")) {
  18. String task = (String)context.getTask();
  19. Thread.sleep(2000);
  20. return new ProcessResult(true, task);
  21. }
  22. return new ProcessResult(false);
  23. }
  24. @Override
  25. public ProcessResult reduce(JobContext context) throws Exception {
  26. for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
  27. System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
  28. }
  29. return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
  30. }
  31. }