流程管理

更新时间:
复制为 MD 格式

工作流将多个 AI 任务编排为有依赖关系的 DAG 图,支持条件分支、跨流程依赖和变量传递等复杂调度场景。本文介绍如何创建工作流、配置 DAG 节点和调度规则、使用系统变量与自定义变量,以及导入导出工作流。

工作流基本配置

创建工作流时,需要配置基本信息和调度规则。配置项说明如下。

配置项分类

配置项

描述

默认值

基本配置

名称

工作流的名称,同一个应用下需保持唯一。

-

描述

工作流的描述。

-

关联应用

工作流所属的应用。

-

调度类型

  • cron:使用 cron 表达式周期性调度。

  • api:通过 OpenAPI 调度。

cron

cron 表达式

Quartz cron 表达式,用于定义周期性调度规则。

0 0 12 * * ?

高级配置

时区

工作流调度使用的时区,不配置时使用当地时区。

当地时区

日历

工作流按照指定日历调度,不配置时每天调度。

-

流程并发数

同一个工作流同一时间允许运行的最大实例个数。设为 1 表示不允许重复执行,超过并发数时跳过当前调度。

1

DAG 编排

通过可视化白屏画布,以拖拽方式编排节点依赖关系,支持逻辑节点处理复杂工作流。

编排节点和边

  1. 从左侧菜单栏拖动任务到画布,完成创建节点操作。

  2. 鼠标移到节点右侧或下侧,浮现"+"图标,拖拽到另一个节点左侧或上侧,完成新增依赖操作。

  3. 鼠标左键选中节点或边,按 DELETE 键,完成删除操作。

  4. 鼠标左键单击节点,或右键节点选择编辑,完成节点基本信息修改操作。

  5. 鼠标右键节点,选择编辑脚本,完成节点脚本修改操作。

任务节点

任务节点的配置项说明如下。

配置项分类

配置项

描述

默认值

基本配置

名称

任务的名称,同一个应用下需保持唯一。

-

描述

任务的描述。

-

任务类型

  • OpenClaw:OpenClaw Agent。

  • DifyWorkflow:Dify 工作流。

  • DifyAgent:Dify Agent。

  • 百炼:百炼应用。

  • Shell:Unix Shell 脚本。

  • Python:Python 脚本。

  • PHP:PHP 脚本。

  • NodeJs:Node.js 脚本。

  • Cmd:Windows CMD 脚本。

-

路由策略

轮询、一致性哈希、权重最小、分片广播、随机、第一个、最后一个、最不经常使用、最近最久未使用。

轮询

依赖策略

  • all_success:上游全部成功或跳过。

  • all_failed:上游全部失败。

  • all_done:上游全部完成(成功、失败、跳过)。

  • all_done_min_one_success:全部完成且至少一个成功。

  • one_failed:上游至少一个失败。

  • one_success:上游至少一个成功。

  • one_done:上游至少一个完成。

all_success

优先级

任务的优先级,需要配合任务队列使用。

任务权重

任务的权重,需要配合权重最小路由策略使用。

1

失败重试次数

失败重试的最大次数。

0

失败重试间隔

失败重试的间隔,单位:秒。

30

最大并发数

同一个任务同一时间允许运行的最大实例个数。设为 1 表示不允许重复执行,超过并发数时跳过当前调度。

1

自定义变量

为任务配置自定义变量,详情请参见本文变量章节。

-

定时配置

日历

任务调度时如果不在日历范围内则跳过。

开始时间

  • 未来某时刻:该任务直到未来某一天才能开始运行。

  • 每天:每天到达指定时间才能开始运行(即使上游依赖已全部满足)。

关闭

通知配置

超时报警

任务运行时间超过设定阈值时报警。

开启

超时自动停止

运行时间超过指定时长后自动终止任务。

关闭

成功通知

任务执行成功后向联系人推送通知。

关闭

失败报警

任务执行失败时报警。

开启

无可用机器报警

任务的执行器不可用时报警。

开启

提前结束报警

任务运行时间小于设定阈值时报警。

关闭

通知方式

支持短信、Webhook、邮件、电话。

-

通知对象

使用云监控联系人。

-

逻辑节点

逻辑节点用于处理复杂的工作流控制逻辑,支持以下类型:状态分支、条件分支、外部依赖、人工节点。

状态分支

通过上游节点的执行状态决定下游走哪个分支,未选择的分支自动变为跳过状态。支持多个上游节点和多个下游节点。

  1. 在工作流画布中添加状态分支节点。

  2. 配置状态分支规则,配置完成后在工作流画布上保存新版本。

如上图所示,配置示例的逻辑等效为:

if ((armon-job31==success || armon-job32==success) && armon-job33==success )
     switch to armon-job34
else
     switch to armon-job35

条件分支

使用 javax.script.ScriptEngine.eval 执行表达式,通过上游节点返回的自定义变量或结果决定下游走哪个分支,未选择的分支自动变为跳过状态。支持多个上游节点和多个下游节点。

  1. 在工作流画布中添加条件分支节点。

  2. 为上游任务(如 armon-job41)添加 OUT 类型的自定义输出变量(如 var1)。

  3. 在脚本中输出变量值(如将 var1 赋值为 "world")。

  4. 配置条件分支逻辑节点的判断规则。

外部依赖

外部依赖节点用于跨工作流设置依赖关系,适用以下场景:

  • 将过大的流程拆分为多个小流程,提取公共流程 C,允许流程 A 和流程 B 同时依赖流程 C。

  • 流程 A 为周报任务(每周执行一次),流程 B 为天任务(每天执行一次),流程 A 中某个任务需要依赖流程 B 中某个任务上一周每天都成功。

外部依赖节点配置如下。

时间周期

取值

时间范围

  • 今天:依赖另一个任务当天的数据。

  • 昨天:依赖另一个任务昨天的数据。

  • 前天(T-2):依赖另一个任务 T-2 天的数据。

  • 前三天(T-3):依赖另一个任务 T-3 天的数据。

  • 前七天(T-7):依赖另一个任务 T-7 天的数据。

如果被依赖的任务一天调度多次,可指定依赖当天哪个时间段的数据;该时间段内若有多个任务实例,则依赖最后一次。

  • 本周:依赖另一个任务本周的数据(本周一至今天)。

  • 上周:依赖另一个任务上周的数据(上周一至上周日)。

  • 上周一:依赖另一个任务上周一的数据。

  • 上周二:依赖另一个任务上周二的数据。

  • 上周三:依赖另一个任务上周三的数据。

  • 上周四:依赖另一个任务上周四的数据。

  • 上周五:依赖另一个任务上周五的数据。

  • 上周六:依赖另一个任务上周六的数据。

  • 上周日:依赖另一个任务上周日的数据。

如果被依赖任务在周内某天调度多次,可指定依赖当天哪个时间段的数据;该时间段内若有多个任务实例,则依赖最后一次。

  • 本月:依赖另一个任务本月的数据(本月 1 号至今天)。

  • 上月:依赖另一个任务上月的数据(上月 1 号至上月最后一天)。

如果被依赖任务在月内某天调度多次,可指定依赖当天哪个时间段的数据;该时间段内若有多个任务实例,则依赖最后一次。

人工节点

人工节点一旦开始运行,会进入挂起(Held)状态,需要人工确认通过后才能变为成功状态,并继续执行后续节点。

变量

系统变量

工作流提供以下系统内置变量,可在任务脚本中直接引用。

变量

描述

示例

system.schedule.time

调度时间,格式:yyyyMMddHHmmss。

20251212023000

system.data.time

数据时间,格式:yyyyMMddHHmmss。

20251212023000

system.bizdate

数据时间精确到天,格式:yyyyMMdd。

20251212

system.task.definition.name

任务名称。

myTask1

system.task.instance.id

任务实例 ID。

1448366263490740225

system.workflow.definition.name

工作流名称。

myWorkflow1

system.workflow.instance.id

工作流实例 ID。

100

自定义变量

工作流中所有任务都可以配置自定义变量。OUT 类型的变量会放入工作流的变量池,供下游节点使用,实现上下游数据传递。

重要

变量值的大小不能超过 64 KB。

  1. 在任务定义中,添加 OUT 类型的自定义变量。

  2. 在脚本中使用赋值语句输出变量。不同任务类型的赋值语句如下。

    任务类型

    赋值语句

    描述

    Shell

    echo "#{setValue(val1=hello)}"

    输出变量 val1,值为 hello。

    Python

    print("#{setValue(var1=hello)}")

    输出变量 var1,值为 hello。

    PHP

    echo "#{setValue(var1=hello)}\n"

    输出变量 var1,值为 hello。

    NodeJs

    console.log("#{setValue(var1=hello)}");

    输出变量 var1,值为 hello。

    PowerShell

    Write-Host "#{setValue(var1=hello)}"

    输出变量 var1,值为 hello。

    WinCmd

    echo #{setValue(var1=hello)}

    输出变量 var1,值为 hello。

自定义输出结果

任务脚本可以输出自定义结果,不同任务类型的输出语句如下。

任务类型

输出语句

描述

Shell

echo "#{setResult(123456)}"

返回结果 123456。

Python

print(f"#{{setResult(123456)}}")

返回结果 123456。

PHP

echo "#{setResult(123456)}\n"

返回结果 123456。

NodeJs

console.log(`#{setResult(123456)}`);

返回结果 123456。

PowerShell

Write-Host "#{setResult(123456)}"

返回结果 123456。

WinCmd

echo #{setResult(123456)}

返回结果 123456。

工作流导入导出

导出

支持不同应用下多个工作流的批量导出。

导出格式为 JSON,总体结构包括工作流类型、导出格式、功能版本及工作流列表。工作流列表中的每一项包含工作流的定义信息(workflowInfo)、节点列表(nodes)和边列表(edges)。

{
  "kind": "SchedulerXWorkflows",
  "type": "JSON",
  "version": "2.0",
  "content": [
    {
      "workflowInfo": {},
      "nodes": [],
      "edges": []
    }
  ]
}
          

导出字段说明如下。

字段

含义

示例值

kind

工作流类型。

SchedulerXWorkflows

type

导出格式。

JSON

version

功能版本。

2.0

content

工作流列表。

-

content[].workflowInfo

工作流定义信息。

见完整导出示例。

content[].nodes

节点列表。

见完整导出示例。

content[].edges

边列表,定义节点间的依赖关系。

见完整导出示例。

说明

Schedulerx-Root 是系统内置的唯一根节点,节点 ID 为 0,无需手动创建。

以下为完整导出示例,以包含条件分支的工作流为例。

image

image

{
  "kind": "SchedulerXWorkflows",
  "type": "JSON",
  "version": "2.0",
  "content": [
    {
      "workflowInfo": {
        "appName": "qiacheng-import-test",
        "appType": 1,
        "name": "qiacheng-success",
        "description": "qiacheng-success",
        "currentExecuteStatus": 0,
        "timeConfig": {
          "calendar": "workday",
          "dataOffset": 0,
          "timezone": "Hongkong",
          "timeType": 1,
          "paramMap": {},
          "timeExpression": "0 0 12 * * ?"
        },
        "version": "v1.0",
        "maxConcurrency": 1,
        "status": 0,
        "creator": "xxxxx",
        "updater": "xxxxx"
      },
      "nodes": [
        {
          "name": "script-job1",
          "description": "script-job1",
          "jobType": "script_shell",
          "executeMode": "standalone",
          "attemptInterval": 30,
          "startTimeType": 1,
          "startTime": -1,
          "weight": 1,
          "routeStrategy": 1,
          "priority": 5,
          "contentType": 2,
          "content": "...",
          "maxConcurrency": 1,
          "maxAttempt": 0,
          "xattrs": "{\"executorBlockStrategy\":1}",
          "parameters": "",
          "timeConfig": {
            "calendar": "",
            "dataOffset": 0,
            "paramMap": {},
            "timeType": -1,
            "timeExpression": ""
          },
          "monitorConfigInfo": {
            "endEarlyEnable": false,
            "failLimitTimes": 1,
            "failEnable": true,
            "failRate": 100,
            "timeoutKillEnable": false,
            "endEarly": 30,
            "sendChannel": "",
            "timeout": 300,
            "daysOfDeadline": 0,
            "alarmType": "CustomContacts",
            "missWorkerEnable": true,
            "timeoutEnable": true,
            "successNotice": false
          },
          "contactInfoList": [],
          "coordinate": { "x": -176, "y": 357 },
          "dependentStrategy": 1,
          "status": 1
        }
      ],
      "edges": [
        { "from": "Schedulerx-Root", "to": "script-job1" },
        { "from": "Schedulerx-Root", "to": "script-job2" },
        { "from": "script-job1", "to": "condition_branch" },
        { "from": "script-job2", "to": "condition_branch" },
        { "from": "condition_branch", "to": "script-job3" },
        { "from": "condition_branch", "to": "script-job4" }
      ]
    }
  ]
}
          

导入

通过上传 JSON 格式文件导入工作流。

导入时注意以下事项:

  1. 若不存在指定的应用,系统自动创建。

  2. 系统按照实例规格检测工作流和任务数量。例如,专业版小规格 x1 支持最大任务数 100、最大工作流数 20,超过限制时提示导入失败。

  3. 导入格式与导出格式一致,详情请参见本文导出章节。