全部产品
云市场

如何通过工作流进行上下游数据传递

更新时间:2019-06-14 15:52:16

SchedulerX 2.0 提供工作流进行任务编排,同时还支持任务上下游的数据传递,可以让您的业务更加的简单易用。

说明:当前只有简单 Java 任务支持数据传输,分布式 Java 任务请使用 MapReduce 模型进行数据传输。

本文将以 3 个调度任务为例介绍如何通过工作流进行上下游数据传递。

  1. 为应用接入任务调度并部署到 EDAS。

    为这三个应用使用上面的 3 个 JobProcessor 接入任务调度,并部署到 EDAS,具体步骤请参见快速入门——在 EDAS 部署的应用接入任务调度

    • JobProcessor A

      1. @Component
      2. public class TestSimpleJobA extends JavaProcessor {
      3. @Override
      4. public ProcessResult process(JobContext context) throws Exception {
      5. System.out.println("TestSimpleJobA " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
      6. return new ProcessResult(true, String.valueOf(1));
      7. }
      8. }
    • JobProcessor B

      1. @Component
      2. public class TestSimpleJobB extends JavaProcessor {
      3. @Override
      4. public ProcessResult process(JobContext context) throws Exception {
      5. System.out.println("TestSimpleJobB " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
      6. return new ProcessResult(true, String.valueOf(2));
      7. }
      8. }
    • JobProcessor C

      1. @Component
      2. public class TestSimpleJobC extends JavaProcessor {
      3. @Override
      4. public ProcessResult process(JobContext context) throws Exception {
      5. List<JobInstanceData> upstreamDatas = context.getUpstreamData();
      6. int sum = 0;
      7. for (JobInstanceData jobInstanceData : upstreamDatas) {
      8. System.out.println("jobName=" + jobInstanceData.getJobName()
      9. + ", data=" + jobInstanceData.getData());
      10. sum += Integer.valueOf(jobInstanceData.getData());
      11. }
      12. System.out.println("TestSimpleJobC sum=" + sum);
      13. return new ProcessResult(true, String.valueOf(sum));
      14. }
      15. }
  2. 创建调度任务。

    基于该应用创建任务分组并创建调度任务 JobA、JobB 和 JobC,详情请参见创建任务分组创建调度任务

  3. 创建工作流。

    创建工作流,并导入这 3 个调度任务,详情请参见创建工作流

    创建好的工作流如下图所示:

    工作流示例

  4. 流程管理页面单击更多,然后在菜单中单击运行一次

  5. 返回工作流详情页面,右键分别单击 JobA、JobB 和 JobC,在快捷菜单中单击详情查看任务实例详情。

    可以看到 JobA 的结果或错误信息为 1,和 JobProcessor A 一致。同样,查看 JobB 的结果或错误信息为 2,也和 JobProcessor B 一致。而 JobC 的结果为 3(1+2),即上游任务任务 JobA 和 JobB 将数据传递给了 JobC,和 JobProcessor C 的代码一致。