Airflow GUI 使用指南

更新时间:
复制为 MD 格式

Airflow GUI 是 DMS 在社区版 Airflow 基础上提供的可视化编排工具,帮助你通过拖拉拽的方式快速创建和管理 Airflow DAG 任务,无需手写 Python 代码。本文介绍 Airflow GUI 的使用入口、全局配置、节点配置及任务发布流程。

前提条件

快速入门

功能入口

  1. 登录数据管理DMS 5.0

  2. 在顶部菜单栏中,选择Data + AI > 工作空间,或在极简模式的控制台,点击控制台左上角的2023-01-28_15-57-17图标,选择全部功能 > Data + AI > 工作空间

  3. 在空间列表页,点击目标空间ID,进入目标空间。

  4. 在左侧导航栏,点击资源管理器,在CODE模块空白部分,点击鼠标右键,新建Airflow文件

    image

工作流画布概览

进入 GUI 编辑器后,您会看到一个直观的可视化画布。

  • 左侧:是节点面板,点击image按钮您可以在此选择并添加不同类型的任务节点。

  • 中间:是主画布,您可以在此拖拽节点、连接依赖关系,构建您的工作流。

  • 右侧:是配置面板,当您选中某个节点或画布空白处时,会显示对应的配置项。

https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/Yvenve55peLxaloy/img/ae1091fe-bcec-42bb-9cad-1362ef1609a8.png

核心操作流程

  • 全局配置:在右侧面板中,完成 DAG 的全局配置,如名称、调度策略、告警等。

  • 添加与配置节点:从左侧面板拖拽所需节点至画布,并在右侧面板中完成其基础和高级配置。

  • 设置依赖关系:通过拖拽节点之间的锚点,建立任务的上下游依赖关系。

  • 保存与发布:完成工作流设计后,保存并将其发布至 Airflow 环境。

核心概念与配置详解

全局配置

单击顶部导航栏的全局配置按钮,将显示 DAG 的全局配置

  • 基础信息

    • 名称:即 Airflow 中的 DAG ID,必须在您的 Airflow 环境中唯一。

  • 调度配置

    • Cron 表达式:定义 DAG 的周期性调度策略。

      image.png

    • 开始/结束日期:定义 DAG 调度的有效时间范围。

  • 告警设置

    • 配置任务失败、成功或重试时的邮件告警。多个邮箱地址请用逗号隔开。

      image.png

    • 前提条件:使用邮件告警前,需在 Airflow 中完成邮件服务的配置。

    • 模板支持:告警标题和内容支持 Jinja 模板,可动态插入 dagti 等内置变量。邮件内容支持 HTML 格式。

  • 变量

    • 定义可在 DAG 内所有任务中使用的全局变量。变量值同样支持 Jinja 模板

  • 重试设置

    • 配置任务失败后的全局默认重试次数重试间隔

节点配置

从左侧面板添加的每个节点都包含基础配置和高级配置两部分。高级配置为所有节点通用,基础配置则因节点类型而异。

高级配置(通用)

  • 告警配置:可覆盖全局告警设置,为当前节点定义独立的告警策略。

  • 重试:可覆盖全局重试设置,为当前节点定义独立的重试策略。

  • 资源池配置:指定任务在 Airflow 的哪个资源池(Pool)中运行,用于控制任务的并发度。

    image

  • 触发规则(Trigger Rule):定义当前任务的执行条件,如“所有上游任务成功”、“至少一个上游任务成功”等。

    image

基础配置(按节点类型)

以下是部分常用节点的基础配置说明:

  • 单实例 SQL

    • 功能:在指定的数据库实例上执行单条 SQL 语句。

    • 配置:选择实例和数据库,然后直接编写 SQL 或选择一个已存在于工作空间中的 .sql 文件。

      image

    • 回调代码(可选):一段 Python 代码,在 SQL 执行成功后运行。主要用途:

      1. 传递数据(XCom):通过内置的 result 变量(一个包含 data 键的字典)获取 SQL 查询结果,并将其传递给下游任务。例如,result['data'][0]['co'] 表示获取结果集第一行中名为 co 的列的值。

      2. 结果校验:对 SQL 结果进行检查,不满足条件时可主动抛出异常使任务失败。

        if len(result["data"]) > 10:
            raise Exception("too many data")
  • ADB Spark

    • 功能:在 AnalyticDB for MySQL 的 Spark 引擎上提交作业。

    • 配置:选择 ADB 实例和资源组,然后根据任务类型(SQL 或 BATCH)进行配置。

      • SQL 任务:直接编写 Spark SQL 或选择 .sql 文件。

        image

      • BATCH 任务:提供一个 JSON 格式的 Spark 作业配置。

        {
          "args": [
            "1000"
          ],
          "className": "org.apache.spark.examples.SparkPi",
          "comments": [
            "-- Here is just an example of SparkPi. Modify the content and run your spark program."
          ],
          "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "medium"
          },
          "file": "local:///tmp/spark-examples.jar",
          "name": "SparkPi"
        }
  • Notebook

    • 功能:运行工作空间中的 Notebook 文件。

    • 配置:

      • 选择 Notebook Session。

      • 选择 Notebook 文件路径。文件必须在工作空间内已配置,路径为工作空间的文件路径。

        image

  • Lindorm Spark

    • 功能:提交 Lindorm Spark 作业。

    • 配置:

      • 选择 Lindorm 实例。

      • 选择作业类型并配置作业参数。

        image

  • 无锁数据变更

    • 功能:以无锁方式在指定实例和数据库上执行数据变更 SQL。配置方式与"单实例 SQL"节点一致,同样支持回调代码处理执行结果。

  • 条件分支

    • 功能:根据不同的条件,决定工作流的走向。

    • 配置:可以添加多个分支条件,每个条件都是一个返回 True 或 False 的 Python 表达式。表达式支持 Jinja 模板,可以引用 Airflow 变量。

      image

    • 条件分支有多个下游边,可以拖拉拽设置。

      image

  • SSH 操作

    • 功能:通过 SSH 连接到远程服务器并执行命令。

    • 配置:

      1. 在 Airflow 的 Connections 中预先配置好 SSH 连接信息,并获取其 Connection ID

        image.png

      2. 在节点配置中填入该 Connection ID 和要执行的 Shell 命令。

        image.png

  • Bash 任务

    • 功能:在本地环境中执行 Bash 命令。

    • 配置:

      1. 配置要执行的 Bash 命令。

      2. 配置命令执行时的环境变量。

        image

  • 时间间隔

    • 功能:用于在任务流程中暂停等待一段指定的时间后再继续执行后续节点。

  • 等待指定时间

    • 功能:等待到达指定的目标时间后再继续执行后续节点。目标时间支持使用 Airflow Jinja 模板语法动态设置。

  • External Task Sensor

    • 功能:配置外部任务依赖检查,等待同一 Airflow 实例中的外部 DAG 或 Task 完成后再继续执行。

    • 配置:

      • 上游 DAG 名称:被依赖的 DAG 的 ID。

        image.png

      • 上游节点名称(可选):被依赖的 Task 的 ID。如果留空,则表示等待整个上游 DAG 完成。

  • 空任务

    • 功能:一个不做任何事情的“占位符”节点,常用于聚合多个上游分支,或作为逻辑起点/终点,使工作流结构更清晰。

    • 配置:无。

  • 入湖数据集成

    • 功能:将数据从源库集成到目标数据湖。

    • 配置:

      • 选择源库和目标库。

        image.png

      • 配置数据集成任务的其他参数。

        image

设置依赖关系

  • 添加依赖:将鼠标悬停在一个节点的边缘,会出锚点。按住鼠标左键从一个节点的锚点拖拽到另一个节点的锚点,即可创建一条表示依赖关系的边。

    https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/Yvenve55peLxaloy/img/c6d3be23-6d39-44a0-b485-a853e7c3170b.png

  • 删除依赖:单击选中一条边,然后按 Delete 键即可删除。


发布与验证

Git 集成(可选)

如果您的工作空间与 Git 仓库关联,请在发布前先将代码提交至 Git。

  1. Commit:在 Git 面板中提交您的更改。

    https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/Yvenve55peLxaloy/img/0c49343f-9634-49a3-b3c1-2e173ad1d95d.png

  2. Push:将提交推送到远程仓库。

部署至 Airflow

单击编辑器右上角的 部署 按钮,系统会将您通过 GUI 设计的工作流转换为 Python 代码,并发布到您的 Airflow 环境中。

https://alidocs.oss-cn-zhangjiakou.aliyuncs.com/res/Yvenve55peLxaloy/img/15349f9d-e9bc-456c-bfd9-6e2d920e7166.png

在 Airflow 中验证

部署成功后,登录您的 Airflow UI。您会看到一个新的 DAG,其 DAG ID 与您在 GUI 中配置的名称一致,并且带有一个 generated 标签,表示它是由 GUI 生成的。您可以在此触发、监控和管理这个 DAG 的运行。