SchedulerX 2.0 提供的工作流功能可以对多个任务进行编排,同时还支持上下游任务间的数据传递,让您的业务更加的简单易用。本文将以 3 个调度任务为例介绍如何通过工作流进行上下游任务间的数据传递。

背景信息

当前只有简单 Java 任务支持数据传递,分布式 Java 任务请使用 MapReduce 模型进行数据传递,详情请参见MapReduce 模型

操作步骤

  1. 在三个应用中分别实现任务调度类 JobProcessor A、JobProcessor B 和 JobProcessor C。
    • JobProcessor A

      @Component
      public class TestSimpleJobA extends JavaProcessor {
      @Override
      public ProcessResult process(JobContext context) throws Exception {
      System.out.println("TestSimpleJobA " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
      return new ProcessResult(true, String.valueOf(1));
      }
      }                            
    • JobProcessor B

      @Component
      public class TestSimpleJobB extends JavaProcessor {
      @Override
      public ProcessResult process(JobContext context) throws Exception {
      System.out.println("TestSimpleJobB " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
      return new ProcessResult(true, String.valueOf(2));
      }
      }                            
    • JobProcessor C

      @Component
      public class TestSimpleJobC extends JavaProcessor {
      @Override
      public ProcessResult process(JobContext context) throws Exception {
      List<JobInstanceData> upstreamDatas = context.getUpstreamData();
      int sum = 0;
      for (JobInstanceData jobInstanceData : upstreamDatas) {
            System.out.println("jobName=" + jobInstanceData.getJobName() 
                + ", data=" + jobInstanceData.getData());
            sum += Integer.valueOf(jobInstanceData.getData());
      }
      System.out.println("TestSimpleJobC sum=" + sum);
      return new ProcessResult(true, String.valueOf(sum));
      }
      }                            
  2. 将这三个应用部署到 EDAS,详情请参见为应用实现任务调度(EDAS 部署)
  3. 分别为这三个应用创建任务分组和调度任务 jobA、jobB 和 jobC,详情请参见创建任务分组创建调度任务
  4. 创建工作流,并导入这 3 个调度任务,详情请参见创建工作流

    创建好的工作流如下图所示:

    工作流示意
  5. 流程管理页面单击更多,然后在下拉列表中选择运行一次

执行结果

返回工作流详情页面,右键单击 jobA、jobB 和 jobC,在快捷菜单中选择详情,查看任务实例详情。

可以看到 jobA 的结果或错误信息为 1,和 JobProcessor A 一致。

任务实例详情

同样,查看 jobB 的结果或错误信息为 2,也和 JobProcessor B 一致。而 jobC 的结果为 3(1+2),即上游任务 jobA 和 jobB 将数据传递给了 jobC,和 JobProcessor C 的代码一致。

在控制台能看到同样的打印信息。

jobName=jobB, data=2
jobName=jobA, data=1
TestSimpleJobC sum=3