Airflow GUI 是 DMS 在社区版 Airflow 基础上提供的可视化编排工具,帮助你通过拖拉拽的方式快速创建和管理 Airflow DAG 任务,无需手写 Python 代码。本文介绍 Airflow GUI 的使用入口、全局配置、节点配置及任务发布流程。
前提条件
已完成搭建Airflow环境。
如需使用邮件告警功能,需提前在 Airflow 中完成邮件配置,使用自定义邮件服务。
快速入门
功能入口
登录数据管理DMS 5.0。
在顶部菜单栏中,选择Data + AI > 工作空间,或在极简模式的控制台,点击控制台左上角的
图标,选择全部功能 > Data + AI > 工作空间。在空间列表页,点击目标空间ID,进入目标空间。
在左侧导航栏,点击资源管理器,在CODE模块空白部分,点击鼠标右键,新建Airflow文件。

工作流画布概览
进入 GUI 编辑器后,您会看到一个直观的可视化画布。
左侧:是节点面板,点击
按钮您可以在此选择并添加不同类型的任务节点。中间:是主画布,您可以在此拖拽节点、连接依赖关系,构建您的工作流。
右侧:是配置面板,当您选中某个节点或画布空白处时,会显示对应的配置项。

核心操作流程
全局配置:在右侧面板中,完成 DAG 的全局配置,如名称、调度策略、告警等。
添加与配置节点:从左侧面板拖拽所需节点至画布,并在右侧面板中完成其基础和高级配置。
设置依赖关系:通过拖拽节点之间的锚点,建立任务的上下游依赖关系。
保存与发布:完成工作流设计后,保存并将其发布至 Airflow 环境。
核心概念与配置详解
全局配置
单击顶部导航栏的全局配置按钮,将显示 DAG 的全局配置
基础信息:
名称:即 Airflow 中的
DAG ID,必须在您的 Airflow 环境中唯一。
调度配置:
Cron 表达式:定义 DAG 的周期性调度策略。

开始/结束日期:定义 DAG 调度的有效时间范围。
告警设置:
配置任务失败、成功或重试时的邮件告警。多个邮箱地址请用逗号隔开。

前提条件:使用邮件告警前,需在 Airflow 中完成邮件服务的配置。
模板支持:告警标题和内容支持 Jinja 模板,可动态插入
dag、ti等内置变量。邮件内容支持 HTML 格式。
变量:
定义可在 DAG 内所有任务中使用的全局变量。变量值同样支持 Jinja 模板。
重试设置:
配置任务失败后的全局默认重试次数和重试间隔。
节点配置
从左侧面板添加的每个节点都包含基础配置和高级配置两部分。高级配置为所有节点通用,基础配置则因节点类型而异。
高级配置(通用)
告警配置:可覆盖全局告警设置,为当前节点定义独立的告警策略。
重试:可覆盖全局重试设置,为当前节点定义独立的重试策略。
资源池配置:指定任务在 Airflow 的哪个资源池(Pool)中运行,用于控制任务的并发度。

触发规则(Trigger Rule):定义当前任务的执行条件,如“所有上游任务成功”、“至少一个上游任务成功”等。

基础配置(按节点类型)
以下是部分常用节点的基础配置说明:
单实例 SQL
功能:在指定的数据库实例上执行单条 SQL 语句。
配置:选择实例和数据库,然后直接编写 SQL 或选择一个已存在于工作空间中的
.sql文件。
回调代码(可选):一段 Python 代码,在 SQL 执行成功后运行。主要用途:
传递数据(XCom):通过内置的
result变量(一个包含data键的字典)获取 SQL 查询结果,并将其传递给下游任务。例如,result['data'][0]['co']表示获取结果集第一行中名为co的列的值。结果校验:对 SQL 结果进行检查,不满足条件时可主动抛出异常使任务失败。
if len(result["data"]) > 10: raise Exception("too many data")
ADB Spark
功能:在 AnalyticDB for MySQL 的 Spark 引擎上提交作业。
配置:选择 ADB 实例和资源组,然后根据任务类型(SQL 或 BATCH)进行配置。
SQL 任务:直接编写 Spark SQL 或选择
.sql文件。
BATCH 任务:提供一个 JSON 格式的 Spark 作业配置。
{ "args": [ "1000" ], "className": "org.apache.spark.examples.SparkPi", "comments": [ "-- Here is just an example of SparkPi. Modify the content and run your spark program." ], "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 2, "spark.executor.resourceSpec": "medium" }, "file": "local:///tmp/spark-examples.jar", "name": "SparkPi" }
Notebook
功能:运行工作空间中的 Notebook 文件。
配置:
选择 Notebook Session。
选择 Notebook 文件路径。文件必须在工作空间内已配置,路径为工作空间的文件路径。

Lindorm Spark
功能:提交 Lindorm Spark 作业。
配置:
选择 Lindorm 实例。
选择作业类型并配置作业参数。

无锁数据变更
功能:以无锁方式在指定实例和数据库上执行数据变更 SQL。配置方式与"单实例 SQL"节点一致,同样支持回调代码处理执行结果。
条件分支
功能:根据不同的条件,决定工作流的走向。
配置:可以添加多个分支条件,每个条件都是一个返回
True或False的 Python 表达式。表达式支持 Jinja 模板,可以引用 Airflow 变量。
条件分支有多个下游边,可以拖拉拽设置。

SSH 操作
功能:通过 SSH 连接到远程服务器并执行命令。
配置:
在 Airflow 的
Connections中预先配置好 SSH 连接信息,并获取其Connection ID。
在节点配置中填入该
Connection ID和要执行的 Shell 命令。
Bash 任务
功能:在本地环境中执行 Bash 命令。
配置:
配置要执行的 Bash 命令。
配置命令执行时的环境变量。

时间间隔
功能:用于在任务流程中暂停等待一段指定的时间后再继续执行后续节点。
等待指定时间
功能:等待到达指定的目标时间后再继续执行后续节点。目标时间支持使用 Airflow Jinja 模板语法动态设置。
External Task Sensor
功能:配置外部任务依赖检查,等待同一 Airflow 实例中的外部 DAG 或 Task 完成后再继续执行。
配置:
上游 DAG 名称:被依赖的 DAG 的 ID。

上游节点名称(可选):被依赖的 Task 的 ID。如果留空,则表示等待整个上游 DAG 完成。
空任务
功能:一个不做任何事情的“占位符”节点,常用于聚合多个上游分支,或作为逻辑起点/终点,使工作流结构更清晰。
配置:无。
入湖数据集成
功能:将数据从源库集成到目标数据湖。
配置:
选择源库和目标库。

配置数据集成任务的其他参数。

设置依赖关系
添加依赖:将鼠标悬停在一个节点的边缘,会出锚点。按住鼠标左键从一个节点的锚点拖拽到另一个节点的锚点,即可创建一条表示依赖关系的边。

删除依赖:单击选中一条边,然后按
Delete键即可删除。
发布与验证
Git 集成(可选)
如果您的工作空间与 Git 仓库关联,请在发布前先将代码提交至 Git。
Commit:在 Git 面板中提交您的更改。

Push:将提交推送到远程仓库。
部署至 Airflow
单击编辑器右上角的 部署 按钮,系统会将您通过 GUI 设计的工作流转换为 Python 代码,并发布到您的 Airflow 环境中。

在 Airflow 中验证
部署成功后,登录您的 Airflow UI。您会看到一个新的 DAG,其 DAG ID 与您在 GUI 中配置的名称一致,并且带有一个 generated 标签,表示它是由 GUI 生成的。您可以在此触发、监控和管理这个 DAG 的运行。
















