本文详细说明工作流的配置方法,包含如何创建工作流以及DAG编排的基本操作、节点介绍以及变量配置等,帮助用户高效完成自动化任务的配置与管理。
前提条件
升级XXL-Job引擎至3.0.0及以上版本。
创建工作流
登录MSE XXL-JOB控制台,并在顶部菜单栏选择地域。
在左侧导航栏,选择。
单击进入目标实例,在左侧导航栏选择,单击创建工作流,完成以下配置然后单击确定完成工作流创建。
配置项分类
配置项
描述
默认值
基本配置
工作流名称
同一个应用下工作流名称需保持唯一。
应用ID
该工作流关联的应用。
时间类型
cron:使用cron表达式周期性调度。
api:通过openapi调度。
cron
cron表达式
定时任务时间表达式,支持使用生成工具进行编辑。
0 0 12 * * ?
高级配置
时区
定时任务执行依赖的时区。
PRC
日历
工作流按照指定日历调度,默认为每天调度。
最大并发数
同一个工作流同一时间允许运行的最大实例个数,1表示不允许重复执行。如果超过并发数,将跳过当前调度。
1
DAG编排
DAG编排提供可视化画布,支持通过拖拽节点配置任务依赖关系,并可利用逻辑节点实现复杂工作流的处理。
基本操作
工作流创建完成后,单击操作列DAG进入工作流编辑画布。
从左侧菜单栏拖拽任务至画布,完成节点创建。
将鼠标悬停于节点右侧或下侧,出现“+”图标后,拖拽至另一节点的左侧或上侧,建立依赖关系。
选中节点或连线后单击鼠标右键,可移除对应元素。
通过鼠标左键双击节点,或右键点击节点后选择编辑,可修改节点配置信息。
若节点为任务节点,右键点击节点并选择编辑脚本,可修改该节点的脚本内容。
单击保存新版本并输入版本名称后可保存当前工作流的变更。
指定机器:如果应用接入多个执行器,可指定任务节点运行的执行器。
任务节点
任务节点参数说明
逻辑节点
状态分支
对标DolphinScheduler的Conditions节点。根据上游节点的状态动态选择下游执行分支,未被选中的分支将被标记为已跳过,支持多上游节点与多下游节点的复杂依赖关系。
在工作流画布中增加一个状态分支节点。

配置状态分支。

如上图所示的配置,节点将执行以下逻辑。
if ((armon-job31==success || armon-job32==success) && armon-job33==success ) switch to armon-job34 else switch to armon-job35
条件分支
对标DolphinScheduler的Switch节点。使用javax.script.ScriptEngine.eval执行表达式,下游分支的执行路径由上游节点返回的自定义变量或输出结果决定,未被选中的分支将被标记为跳过状态,支持多个上游节点与多个下游节点的动态分支配置。
在工作流画布中增加一个条件分支节点

为armon-job41任务增加一个自定义输出变量
var1。
编辑armon-job41的脚本,输出
var1变量为world。
配置armon-condition-1逻辑节点如下

工作流执行结果
上游任务节点输出为world并通过自定义变量
var1传递至后续节点。虽然armon-condition-1节点默认分支流转至armon-job44节点,但因接收到变量var1后执行ELIF逻辑,将执行armon-job43任务节点。
外部依赖
对标DolphinScheduler的Dependent节点。当一个流程太大时,可以拆成多个小流程,提取出一个公共的流程C,A流程和B流程均可依赖C流程。
以如下配置为例,后续任务节点执行需要依赖DependentTest应用下DependentTestFlow工作流的DependentTestFlowJob任务节点在当天19:00:00至20:00:00时间内的最后一次执行结果成功。
如果依赖的工作流对应任务在规定时间内未执行完成或任务执行失败,即使其他前置依赖已经满足,后续节点仍不可执行。

时间周期说明
时间周期 | 取值 | 时间范围 |
日 |
| 若被依赖任务在一天内调度多次,可指定依赖当天特定时间段的数据。 若该时间段内存在多个任务实例,则默认依赖该时间段内最后一次执行的实例。 |
周 |
| 如果被依赖任务在周内某天调度多次,可指定依赖当天哪个时间段的数据;该时间段内若有多个任务实例,则依赖最后一次。 |
月 |
| 如果被依赖任务在月内某天调度多次,可指定依赖当天哪个时间段的数据;该时间段内若有多个任务实例,则依赖最后一次。 |
人工节点
人工节点一旦开始运行就将进入挂起(Held) 状态,需要人工确认通过才能变成成功状态,继续后续的任务执行。
以下图为例,data-process任务前置节点包含人工节点,当data-import节点执行完毕后仍需等待manual-check节点的确认结果才可执行。

变量
系统变量
变量 | 描述 | 示例 |
system.schedule.time | 调度时间, yyyyMMddHHmmss格式 | 20251212023000 |
system.data.time | 数据时间, yyyyMMddHHmmss格式 | 20251212023000 |
system.bizdate | 数据时间精确到天,yyyyMMdd | 20251212 |
system.task.definition.name | 任务名称 | myTask1 |
system.task.instance.id | 任务实例id | 1448366263490740225 |
system.workflow.definition.name | 工作流名称 | myWorkflow1 |
system.workflow.instance.id | 工作流实例id | 100 |
自定义变量
工作流中所有任务节点均可自定义变量,分为IN和OUT两种类型。
IN:作为任务节点的输入变量,当前节点脚本可直接使用该变量的值。
OUT:如果下游节点需要依赖上游节点输出的结果,可通过OUT类型变量进行参数传递。
变量值的大小不能超过64KB。
OUT类型变量将对节点整个下游生效,不局限于下一节点。
举例说明:
在任务定义中配置如下脚本。
#!/bin/bash echo "xxl-job: hello shell" # system variables echo "System schedule time (yyyyMMddHHmmss): ${system.schedule.time}" echo "System data time (yyyyMMddHHmmss): ${system.data.time}" echo "System data date(yyyyMMdd): ${system.biz.curdate}" echo "Task Definition Name: ${system.task.definition.name}" echo "Task Instance ID: ${system.task.instance.id}" echo "Workflow Definition Name: ${system.workflow.definition.name}" echo "Workflow Instance ID: ${system.workflow.instance.id}" # custom variables echo "#{setValue(var1=hello)}" echo "#{setValue(var2=world)}" # custom output sum=0 for ((i=1; i<=100; i++)) do echo "hello world $i" sum=$((sum + i)) done echo "#{setResult($sum)}" exit 0在自定义参数中添加两个OUT类型参数。

此时脚本最终任务结果为5050,并且在执行过程中为
var1和var2分别赋值为hello和world,通过步骤2的自定义参数配置,可将var1和var2作为变量向下游传递,如果未设置自定义参数,var1和var2的赋值仅在此任务节点中生效。
可参考条件分支,其中条件节点的输入参数是上一任务节点通过自定义变量传递的结果。
Demo
Shell Demo
#!/bin/bash
echo "xxl-job: hello shell"
# system variables
echo "System schedule time (yyyyMMddHHmmss): ${system.schedule.time}"
echo "System data time (yyyyMMddHHmmss): ${system.data.time}"
echo "System data date(yyyyMMdd): ${system.biz.curdate}"
echo "Task Definition Name: ${system.task.definition.name}"
echo "Task Instance ID: ${system.task.instance.id}"
echo "Workflow Definition Name: ${system.workflow.definition.name}"
echo "Workflow Instance ID: ${system.workflow.instance.id}"
# custom variables
echo "#{setValue(var1=hello)}"
echo "#{setValue(var2=world)}"
# custom output
sum=0
for ((i=1; i<=100; i++))
do
echo "hello world $i"
sum=$((sum + i))
done
echo "#{setResult($sum)}"
exit 0导出工作流
在流程管理页面,选中目标工作流,单击导出工作流。可以同时导出多个工作流。
工作流导出格式说明
导出格式为JSON格式,总体结构包括工作流类型、导出格式、功能版本,以及导出的工作流列表。
其中工作流列表中的每一项对应一个工作流对象,工作流对象包括工作流的定义信息、节点列表、边列表。
[
"kind": "SchedulerXWorkflows", // 类型
"type": "JSON", // 导出格式
"version": "2.0", // 功能版本
"content": [ // 工作流列表
{
"workflowInfo": {}, // 定义信息
"nodes": [], // 节点列表
"edges": [] // 边列表
},
{...}
],
]字段 | 含义 |
kind | 工作流类型 |
type | 导出格式 |
version | 功能版本 |
content | 工作流列表 |
(content) workflowInfo | 定义信息 |
(content) nodes | 节点列表 |
(content) edges | 边列表 |
工作流导出结果示例
以下图工作流为例,导出具体字段展示如下:
下图中的Schedulerx-Root 是系统内置的唯一根节点,节点ID为0(用户无需手动创建)

{
"kind": "SchedulerXWorkflows",
"type": "JSON",
"version": "2.0",
"content": [
{
"workflowInfo": {
"appName": "qiacheng-import-test",
"appType": 1,
"name": "qiacheng-success",
"description": "qiacheng-success",
"currentExecuteStatus": 0,
"timeConfig": {
"calendar": "workday",
"dataOffset": 0,
"timezone": "Hongkong",
"timeType": 1,
"paramMap": {},
"timeExpression": "0 0 12 * * ?"
},
"version": "v1.0",
"maxConcurrency": 1,
"status": 0,
"creator": "xxxxx",
"updater": "xxxxx"
},
"nodes": [
{
"name": "script-job1",
"description": "script-job1",
"jobType": "script_shell",
"executeMode": "standalone",
"attemptInterval": 30,
"startTimeType": 1,
"startTime": -1,
"weight": 1,
"routeStrategy": 1,
"priority": 5,
"contentType": 2,
"content": "#!/bin/bash\n# system variables\necho \"System schedule time (yyyyMMddHHmmss): ${system.schedule.time}\"\necho \"System data time (yyyyMMddHHmmss): ${system.data.time}\"\necho \"System data date(yyyyMMdd): ${system.biz.curdate}\"\necho \"Task Definition Name: ${system.task.definition.name}\"\necho \"Task Instance ID: ${system.task.instance.id}\"\necho \"Workflow Definition Name: ${system.workflow.definition.name}\"\necho \"Workflow Instance ID: ${system.workflow.instance.id}\"\n\n# custom variables\necho \"#{setValue(var1=hello)}\"\necho \"#{setValue(var2=world)}\"\n\n# task process\nsum=0\nfor ((i=1; i<=100; i++))\ndo\n echo \"hello world $i\"\n sum=$((sum + i))\ndone\n\n# custom output result\necho \"#{setResult($sum)}\"\n\n# Returns 0 on success, otherwise returns an error code\nexit 0",
"maxConcurrency": 1,
"maxAttempt": 0,
"xattrs": "{\"executorBlockStrategy\":1}",
"parameters": "",
"timeConfig": {
"calendar": "",
"dataOffset": 0,
"paramMap": {},
"timeType": -1,
"timeExpression": ""
},
"monitorConfigInfo": {
"endEarlyEnable": false,
"failLimitTimes": 1,
"failEnable": true,
"failRate": 100,
"timeoutKillEnable": false,
"endEarly": 30,
"sendChannel": "",
"timeout": 300,
"daysOfDeadline": 0,
"alarmType": "CustomContacts",
"missWorkerEnable": true,
"timeoutEnable": true,
"successNotice": false
},
"contactInfoList": [],
"coordinate": {
"x": -176,
"y": 357
},
"dependentStrategy": 1,
"status": 1
},
{
"name": "script-job2",
// 类似
},
{
"name": "script-job3",
// 类似
},
{
"name": "script-job4",
// 类似
},
{
"name": "condition_branch",
// 类似
}
],
"edges": [
{
"from": "Schedulerx-Root",
"to": "script-job2"
},
{
"from": "Schedulerx-Root",
"to": "script-job1"
},
{
"from": "script-job1",
"to": "condition_branch"
},
{
"from": "script-job2",
"to": "condition_branch"
},
{
"from": "condition_branch",
"to": "script-job4"
},
{
"from": "condition_branch",
"to": "script-job3"
}
],
"kind": "SchedulerXWorkflows",
}
]
}导入工作流
在流程管理页面,单击导入工作流。在选择文件区域,上传工作流JSON文件。如果已经存在相同工作流,导入时可以选择覆盖或跳过。
导入时若不存在指定的应用,则会自动创建应用。
系统按照实例规格,检测工作流和任务数(例如:专业版小规格x1,支持最大任务数100,最大工作流数20),若超过则提示导入失败。
导入格式与导出格式一致,具体可见导出部分说明。

