如何通过工作流进行上下游数据传递
SchedulerX提供的工作流功能可以对多个任务进行编排,同时还支持上下游任务间的数据传递,让您的业务更加的简单易用。本文介绍如何通过工作流进行上下游任务间的数据传递。
背景信息
当前只有简单Java任务支持数据传递,分布式Java任务请使用MapReduce模型进行数据传递。更多信息,请参见MapReduce模型。
操作步骤
在三个应用中分别实现任务调度类JobProcessor A、JobProcessor B和JobProcessor C。
JobProcessor A
@Component public class TestSimpleJobA extends JavaProcessor { @Override public ProcessResult process(JobContext context) throws Exception { System.out.println("TestSimpleJobA " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss")); return new ProcessResult(true, String.valueOf(1)); } }
JobProcessor B
@Component public class TestSimpleJobB extends JavaProcessor { @Override public ProcessResult process(JobContext context) throws Exception { System.out.println("TestSimpleJobB " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss")); return new ProcessResult(true, String.valueOf(2)); } }
JobProcessor C
@Component public class TestSimpleJobC extends JavaProcessor { @Override public ProcessResult process(JobContext context) throws Exception { List<JobInstanceData> upstreamDatas = context.getUpstreamData(); int sum = 0; for (JobInstanceData jobInstanceData : upstreamDatas) { System.out.println("jobName=" + jobInstanceData.getJobName() + ", data=" + jobInstanceData.getData()); sum += Integer.valueOf(jobInstanceData.getData()); } System.out.println("TestSimpleJobC sum=" + sum); return new ProcessResult(true, String.valueOf(sum)); } }
将三个应用部署到EDAS。
创建工作流,并导入这3个调度任务,详情请参见创建工作流。
创建好的工作流如下图所示:
在流程管理页面具体流程的操作列单击更多,然后在下拉列表中选择运行一次。
执行结果
返回工作流详情页面,右键单击jobA、jobB和jobC,在快捷菜单中选择详情,查看任务实例详情。
可以看到jobA的结果或错误信息为1,和JobProcessor A一致。

同样,查看jobB的结果或错误信息为2,也和JobProcessor B一致。而jobC的结果为3(1+2),即上游任务jobA和jobB将数据传递给了jobC,和JobProcessor C的代码一致。
在控制台能看到同样的打印信息。
jobName=jobB, data=2
jobName=jobA, data=1
TestSimpleJobC sum=3