流程管理

更新时间:
复制为 MD 格式

本文详细说明工作流的配置方法,包含如何创建工作流以及DAG编排的基本操作、节点介绍以及变量配置等,帮助用户高效完成自动化任务的配置与管理。

前提条件

创建XXL-Job实例

升级XXL-Job引擎至3.0.0及以上版本。

创建工作流

  1. 登录MSE XXL-JOB控制台,并在顶部菜单栏选择地域。

  2. 在左侧导航栏,选择任务调度 > XXL-JOB

  3. 单击进入目标实例,在左侧导航栏选择工作流 > 流程管理,单击创建工作流,完成以下配置然后单击确定完成工作流创建。

    配置项分类

    配置项

    描述

    默认值

    基本配置

    工作流名称

    同一个应用下工作流名称需保持唯一。

    应用ID

    该工作流关联的应用。

    时间类型

    • cron:使用cron表达式周期性调度。

    • api:通过openapi调度。

    cron

    cron表达式

    定时任务时间表达式,支持使用生成工具进行编辑。

    0 0 12 * * ?

    高级配置

    时区

    定时任务执行依赖的时区。

    PRC

    日历

    工作流按照指定日历调度,默认为每天调度。

    最大并发数

    同一个工作流同一时间允许运行的最大实例个数,1表示不允许重复执行。如果超过并发数,将跳过当前调度。

    1

DAG编排

DAG编排提供可视化画布,支持通过拖拽节点配置任务依赖关系,并可利用逻辑节点实现复杂工作流的处理。

基本操作

  1. 工作流创建完成后,单击操作DAG进入工作流编辑画布。

  2. 从左侧菜单栏拖拽任务至画布,完成节点创建。

  3. 将鼠标悬停于节点右侧或下侧,出现“+”图标后,拖拽至另一节点的左侧或上侧,建立依赖关系。

  4. 选中节点或连线后单击鼠标右键,可移除对应元素。

  5. 通过鼠标左键双击节点,或右键点击节点后选择编辑,可修改节点配置信息。

  6. 若节点为任务节点,右键点击节点并选择编辑脚本,可修改该节点的脚本内容。

  7. 单击保存新版本并输入版本名称后可保存当前工作流的变更。

  8. 指定机器:如果应用接入多个执行器,可指定任务节点运行的执行器。

任务节点

任务节点参数说明

配置项分类

配置项

描述

默认值

基本配置

任务名称

同一个应用下任务名称需保持唯一。

任务类型

  • Shell:Unix Shell 脚本

  • Python:Python 脚本

  • PHP:PHP 脚本

  • NodeJs:Node.js 脚本

  • Cmd:Windows CMD 命令

  • BEAN:XXL-JOB BEAN

  • HTTP:发起HTTP请求,无需部署执行器

  • Dify:调用Dify工作流任务

路由策略

  • 轮询

  • 一致性哈希

  • 权重最小

  • 分片广播

  • 随机

  • 第一个

  • 最后一个

  • 最不经常使用

  • 最近最久未使用

轮询

堵塞处理策略

当该任务在该执行器上一次运行尚未完成,如何处理新派发的任务。

  • 单机串行:任务在队列中等待,待前一任务完成后依次执行。

  • 丢弃后续调度:新任务将被丢弃,不予执行。

  • 覆盖之前调度:丢弃前一任务,立即执行新任务。

单机串行

依赖检查策略

  • all_success:上游全部成功或跳过

  • all_failed:上游全部失败

  • all_done:上游全部完成(成功、失败、跳过)

  • all_done_min_one_success:全部完成并且至少一个成功

  • one_failed:上游至少一个失败

  • one_success:上游至少一个成功

  • one_done:上游至少一个完成

all_success

优先级

任务的优先级,需要配合任务队列使用。

任务权重

任务的权重,需要配合权重最小路由策略使用。

1

失败重试次数

失败重试的最大次数。

0

失败重试间隔

失败重试的间隔,单位为秒。

30

任务并发数

同一个任务同一时间允许运行的最大实例个数,1表示不允许重复执行。如果超过并发数,会跳过当前调度。

1

自定义变量

在脚本之外为节点定义的输入/输出变量,更多详情可参考自定义变量

定时配置

时区

该任务节点定时执行时依赖的时区。

日历

当工作流执行至该任务节点时,若当前时间未达到节点配置的日历时间,则该任务节点将不会执行。

开始时间

任务节点开始生效执行的时间。

  • 立即生效:该任务节点在上游节点完成后立即执行。

  • 指定开始时间:该任务节点在指定时间开始运行,即使上游节点已经完成。

    • 未来某时刻:在未来指定的日期时间满足后启动执行。

    • 每天:每天到达指定时间才开始运行。

    • 每小时:每小时内到达指定时间才开始运行。

    • 延时:当上游节点完成后延迟一段时间后执行,单位为秒。

立即生效

通知配置

超时报警

任务运行超过设定的阈值则报警。

开启

超时时间

开启超时报警时需配置超时时间,单位为秒。

300

超时停止

开启超时报警时可配置超时停止时间,任务运行超过指定时间自动终止。

关闭

成功通知

任务执行成功会通过配置的渠道推送给联系人,10分钟内最多推送一次。

关闭

失败报警

开启后当任务节点失败将进行报警。

开启

连续失败次数

失败次数达到连续失败次数后才会报警。

无可用机器报警

开启后如果任务没有可用机器,不会生成调度记录,只会进行报警。

关闭后如果任务没有可用机器,将生成一条失败的调度记录。

开启

提前结束报警

任务运行时间小于设定的阈值则报警。

关闭

通知方式

支持短信、webhook、邮件、电话。支持多选。

通知对象

可添加使用云监控联系人或联系人组。

逻辑节点

状态分支

对标DolphinSchedulerConditions节点。根据上游节点的状态动态选择下游执行分支,未被选中的分支将被标记为已跳过,支持多上游节点与多下游节点的复杂依赖关系。

  1. 在工作流画布中增加一个状态分支节点。

    image.png

  2. 配置状态分支。

    image

  3. 如上图所示的配置,节点将执行以下逻辑。

    if ((armon-job31==success || armon-job32==success) && armon-job33==success )
         switch to armon-job34
    else 
         switch to armon-job35

条件分支

对标DolphinSchedulerSwitch节点。使用javax.script.ScriptEngine.eval执行表达式,下游分支的执行路径由上游节点返回的自定义变量或输出结果决定,未被选中的分支将被标记为跳过状态,支持多个上游节点与多个下游节点的动态分支配置。

  1. 在工作流画布中增加一个条件分支节点

    image.png

  2. armon-job41任务增加一个自定义输出变量var1

    image

  3. 编辑armon-job41的脚本,输出var1变量为world

    image

  4. 配置armon-condition-1逻辑节点如下

    image

  5. 工作流执行结果

    上游任务节点输出为world并通过自定义变量var1传递至后续节点。虽然armon-condition-1节点默认分支流转至armon-job44节点,但因接收到变量var1后执行ELIF逻辑,将执行armon-job43任务节点。

外部依赖

对标DolphinSchedulerDependent节点。当一个流程太大时,可以拆成多个小流程,提取出一个公共的流程C,A流程和B流程均可依赖C流程。

    以如下配置为例,后续任务节点执行需要依赖DependentTest应用下DependentTestFlow工作流的DependentTestFlowJob任务节点在当天19:00:0020:00:00时间内的最后一次执行结果成功

    如果依赖的工作流对应任务在规定时间内未执行完成或任务执行失败,即使其他前置依赖已经满足,后续节点仍不可执行。

    image

    时间周期说明

    时间周期

    取值

    时间范围

    • 今天:依赖另一个任务当天的数据。

    • 昨天:依赖另一个任务昨天的数据。

    • 前天(T-2):依赖另一个任务T-2天的数据。

    • 前三天(T-3):依赖另一个任务T-3天的数据。

    • 前七天(T-7):依赖另一个任务T-7天的数据。

    若被依赖任务在一天内调度多次,可指定依赖当天特定时间段的数据。

    若该时间段内存在多个任务实例,则默认依赖该时间段内最后一次执行的实例。

    • 本周:依赖另一个任务本周的数据(本周一至今天)

    • 上周:依赖另一个任务上周的数据(上周一至上周日)

    • 上周一:依赖另一个任务上周一的数据

    • 上周二:依赖另一个任务上周二的数据

    • 上周三:依赖另一个任务上周三的数据

    • 上周四:依赖另一个任务上周四的数据

    • 上周五:依赖另一个任务上周五的数据

    • 上周六:依赖另一个任务上周六的数据

    • 上周日:依赖另一个任务上周日的数据

    如果被依赖任务在周内某天调度多次,可指定依赖当天哪个时间段的数据;该时间段内若有多个任务实例,则依赖最后一次。

    • 本月:依赖另一个任务本月的数据(本月 1 号至今天)

    • 上月:依赖另一个任务上月的数据(上月 1 号至上月最后一天)

    如果被依赖任务在月内某天调度多次,可指定依赖当天哪个时间段的数据;该时间段内若有多个任务实例,则依赖最后一次。

    人工节点

    人工节点一旦开始运行就将进入挂起(Held) 状态,需要人工确认通过才能变成成功状态,继续后续的任务执行。

    以下图为例,data-process任务前置节点包含人工节点,当data-import节点执行完毕后仍需等待manual-check节点的确认结果才可执行。

    image.png

    变量

    系统变量

    变量

    描述

    示例

    system.schedule.time

    调度时间, yyyyMMddHHmmss格式

    20251212023000

    system.data.time

    数据时间, yyyyMMddHHmmss格式

    20251212023000

    system.bizdate

    数据时间精确到天,yyyyMMdd

    20251212

    system.task.definition.name

    任务名称

    myTask1

    system.task.instance.id

    任务实例id

    1448366263490740225

    system.workflow.definition.name

    工作流名称

    myWorkflow1

    system.workflow.instance.id

    工作流实例id

    100

    自定义变量

    工作流中所有任务节点均可自定义变量,分为INOUT两种类型。

    • IN:作为任务节点的输入变量,当前节点脚本可直接使用该变量的值。

    • OUT:如果下游节点需要依赖上游节点输出的结果,可通过OUT类型变量进行参数传递。

    说明

    变量值的大小不能超过64KB。

    OUT类型变量将对节点整个下游生效,不局限于下一节点。

    举例说明:

    1. 在任务定义中配置如下脚本。

      #!/bin/bash
      echo "xxl-job: hello shell"
      
      # system variables
      echo "System schedule time (yyyyMMddHHmmss): ${system.schedule.time}"
      echo "System data time (yyyyMMddHHmmss): ${system.data.time}"
      echo "System data date(yyyyMMdd): ${system.biz.curdate}"
      echo "Task Definition Name: ${system.task.definition.name}"
      echo "Task Instance ID: ${system.task.instance.id}"
      echo "Workflow Definition Name: ${system.workflow.definition.name}"
      echo "Workflow Instance ID: ${system.workflow.instance.id}"
      
      # custom variables
      echo "#{setValue(var1=hello)}"
      echo "#{setValue(var2=world)}"
      
      # custom output
      sum=0
      for ((i=1; i<=100; i++))
      do
          echo "hello world $i"
          sum=$((sum + i))
      done
      echo "#{setResult($sum)}"
      
      exit 0
    2. 在自定义参数中添加两个OUT类型参数。

      image

    3. 此时脚本最终任务结果为5050,并且在执行过程中为var1var2分别赋值为helloworld,通过步骤2的自定义参数配置,可将var1var2作为变量向下游传递,如果未设置自定义参数,var1var2的赋值仅在此任务节点中生效。

    说明

    可参考条件分支,其中条件节点的输入参数是上一任务节点通过自定义变量传递的结果。

    Demo

    Shell Demo

    #!/bin/bash
    echo "xxl-job: hello shell"
    
    # system variables
    echo "System schedule time (yyyyMMddHHmmss): ${system.schedule.time}"
    echo "System data time (yyyyMMddHHmmss): ${system.data.time}"
    echo "System data date(yyyyMMdd): ${system.biz.curdate}"
    echo "Task Definition Name: ${system.task.definition.name}"
    echo "Task Instance ID: ${system.task.instance.id}"
    echo "Workflow Definition Name: ${system.workflow.definition.name}"
    echo "Workflow Instance ID: ${system.workflow.instance.id}"
    
    # custom variables
    echo "#{setValue(var1=hello)}"
    echo "#{setValue(var2=world)}"
    
    # custom output
    sum=0
    for ((i=1; i<=100; i++))
    do
        echo "hello world $i"
        sum=$((sum + i))
    done
    echo "#{setResult($sum)}"
    
    exit 0

    导出工作流

    流程管理页面,选中目标工作流,单击导出工作流。可以同时导出多个工作流。

    工作流导出格式说明

    导出格式为JSON格式,总体结构包括工作流类型、导出格式、功能版本,以及导出的工作流列表。

    其中工作流列表中的每一项对应一个工作流对象,工作流对象包括工作流的定义信息、节点列表、边列表。

    [
      "kind": "SchedulerXWorkflows", // 类型
      "type": "JSON", // 导出格式
      "version": "2.0", // 功能版本
      "content": [ // 工作流列表
        {
          "workflowInfo": {}, // 定义信息
          "nodes": [], // 节点列表
          "edges": [] // 边列表
        },
        {...}
      ],
    ]

    字段

    含义

    kind

    工作流类型

    type

    导出格式

    version

    功能版本

    content

    工作流列表

    (content) workflowInfo

    定义信息

    (content) nodes

    节点列表

    (content) edges

    边列表

    工作流导出结果示例

    以下图工作流为例,导出具体字段展示如下:

    下图中的Schedulerx-Root 是系统内置的唯一根节点,节点ID0(用户无需手动创建)

    image

    {
      "kind": "SchedulerXWorkflows",
      "type": "JSON",
      "version": "2.0",
      "content": [
        {
          "workflowInfo": {
            "appName": "qiacheng-import-test",
            "appType": 1,
            "name": "qiacheng-success",
            "description": "qiacheng-success",
            "currentExecuteStatus": 0,
            "timeConfig": {
              "calendar": "workday",
              "dataOffset": 0,
              "timezone": "Hongkong",
              "timeType": 1,
              "paramMap": {},
              "timeExpression": "0 0 12 * * ?"
            },
            "version": "v1.0",
            "maxConcurrency": 1,
            "status": 0,
            "creator": "xxxxx",
            "updater": "xxxxx"
          },
          "nodes": [
            {
              "name": "script-job1",
              "description": "script-job1",
              "jobType": "script_shell",
              "executeMode": "standalone",
              "attemptInterval": 30,
              "startTimeType": 1,
              "startTime": -1,
              "weight": 1,
              "routeStrategy": 1,
              "priority": 5,
              "contentType": 2,
              "content": "#!/bin/bash\n# system variables\necho \"System schedule time (yyyyMMddHHmmss): ${system.schedule.time}\"\necho \"System data time (yyyyMMddHHmmss): ${system.data.time}\"\necho \"System data date(yyyyMMdd): ${system.biz.curdate}\"\necho \"Task Definition Name: ${system.task.definition.name}\"\necho \"Task Instance ID: ${system.task.instance.id}\"\necho \"Workflow Definition Name: ${system.workflow.definition.name}\"\necho \"Workflow Instance ID: ${system.workflow.instance.id}\"\n\n# custom variables\necho \"#{setValue(var1=hello)}\"\necho \"#{setValue(var2=world)}\"\n\n# task process\nsum=0\nfor ((i=1; i<=100; i++))\ndo\n    echo \"hello world $i\"\n    sum=$((sum + i))\ndone\n\n# custom output result\necho \"#{setResult($sum)}\"\n\n# Returns 0 on success, otherwise returns an error code\nexit 0",
              "maxConcurrency": 1,
              "maxAttempt": 0,
              "xattrs": "{\"executorBlockStrategy\":1}",
              "parameters": "",
              "timeConfig": {
                "calendar": "",
                "dataOffset": 0,
                "paramMap": {},
                "timeType": -1,
                "timeExpression": ""
              },
              "monitorConfigInfo": {
                "endEarlyEnable": false,
                "failLimitTimes": 1,
                "failEnable": true,
                "failRate": 100,
                "timeoutKillEnable": false,
                "endEarly": 30,
                "sendChannel": "",
                "timeout": 300,
                "daysOfDeadline": 0,
                "alarmType": "CustomContacts",
                "missWorkerEnable": true,
                "timeoutEnable": true,
                "successNotice": false
              },
              "contactInfoList": [],
              "coordinate": {
                "x": -176,
                "y": 357
              },
              "dependentStrategy": 1,
              "status": 1
            },
            {
              "name": "script-job2",
              // 类似
            },
            {
              "name": "script-job3",
              // 类似
            },
            {
              "name": "script-job4",
              // 类似
            },
            {
              "name": "condition_branch",
              // 类似
            }
          ],
          "edges": [
            {
              "from": "Schedulerx-Root",
              "to": "script-job2"
            },
            {
              "from": "Schedulerx-Root",
              "to": "script-job1"
            },
            {
              "from": "script-job1",
              "to": "condition_branch"
            },
            {
              "from": "script-job2",
              "to": "condition_branch"
            },
            {
              "from": "condition_branch",
              "to": "script-job4"
            },
            {
              "from": "condition_branch",
              "to": "script-job3"
            }
          ],
          "kind": "SchedulerXWorkflows",
        }
      ]
    }

    导入工作流

    流程管理页面,单击导入工作流。在选择文件区域,上传工作流JSON文件。如果已经存在相同工作流,导入时可以选择覆盖跳过

    说明
    • 导入时若不存在指定的应用,则会自动创建应用。

    • 系统按照实例规格,检测工作流和任务数(例如:专业版小规格x1,支持最大任务数100,最大工作流数20),若超过则提示导入失败。

    • 导入格式与导出格式一致,具体可见导出部分说明。