SchedulerX提供的工作流功能可以对多个任务进行编排,同时还支持上下游任务间的数据传递,让您的业务更加的简单易用。本文将以3个调度任务为例介绍如何通过工作流进行上下游任务间的数据传递。
背景信息
当前只有简单Java任务支持数据传递,分布式Java任务请使用MapReduce模型进行数据传递,详情请参见MapReduce模型。
操作步骤
- 在三个应用中分别实现任务调度类JobProcessor A、JobProcessor B和JobProcessor C。
-
JobProcessor A
@Component
public class TestSimpleJobA extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
System.out.println("TestSimpleJobA " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
return new ProcessResult(true, String.valueOf(1));
}
}
-
JobProcessor B
@Component
public class TestSimpleJobB extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
System.out.println("TestSimpleJobB " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss"));
return new ProcessResult(true, String.valueOf(2));
}
}
-
JobProcessor C
@Component
public class TestSimpleJobC extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
List<JobInstanceData> upstreamDatas = context.getUpstreamData();
int sum = 0;
for (JobInstanceData jobInstanceData : upstreamDatas) {
System.out.println("jobName=" + jobInstanceData.getJobName()
+ ", data=" + jobInstanceData.getData());
sum += Integer.valueOf(jobInstanceData.getData());
}
System.out.println("TestSimpleJobC sum=" + sum);
return new ProcessResult(true, String.valueOf(sum));
}
}
- 将这三个应用部署到EDAS。
- 分别为这三个应用创建任务分组和调度任务jobA、jobB和jobC,详情请参见创建应用和创建调度任务。
- 创建工作流,并导入这3个调度任务,详情请参见创建工作流。
创建好的工作流如下图所示:
- 在流程管理页面具体流程的操作列单击更多,然后在下拉列表中选择运行一次。
执行结果
返回工作流详情页面,右键单击jobA、jobB和jobC,在快捷菜单中选择详情,查看任务实例详情。
可以看到jobA的结果或错误信息为1,和JobProcessor A一致。
同样,查看jobB的结果或错误信息为2,也和JobProcessor B一致。而jobC的结果为3(1+2),即上游任务jobA和jobB将数据传递给了jobC,和JobProcessor
C的代码一致。
在控制台能看到同样的打印信息。
jobName=jobB, data=2
jobName=jobA, data=1
TestSimpleJobC sum=3