使用SAE部署Airflow

Apache Airflow是一个用于编排和调度数据管道的开源平台。使用SAE可以快速部署Apache Airflow,无需管理底层计算资源,而且仅需简单配置即可实现高可用、秒级弹性伸缩。

方案概览

Airflow工作流程架构图如下:

image

组件介绍:

  • Scheduler:调度器,负责调度的工作流,并将任务提交给Executor运行。Executor是调度器的一个配置属性,而不是一个独立的组件,它运行在调度器进程内部。

  • Webserver:Web服务器,它提供了一个方便的用户界面来检查、触发和调试DAGs和任务的行为。

  • 数据库:Airflow组件用数据库来存储工作流和任务的状态。

资源限制说明

  • Afirlow支持数据库引擎版本

    仅支持PostgreSQL、MySQLSQLite。具体支持的版本如下:

    • PostgreSQL:支持12、13、14、15、16版本

    • MySQL:支持8.0和创新版

    • SQLite:仅支持3.15.0+

  • Worker实例资源限制说明

    如果您需要修改以下资源限制,请在钉钉群(群号:32874633)联系相关技术人员进行解决。

    • Worker实例的CPU和内存资源规格默认和airflow-scheduler组件保持一致,暂时不支持修改。

    • Worker实例的运行环境是airflow:2.10.5镜像,只支持Python语言环境,暂时不支持Java 。

  • 资源Quota限制说明

    每个账号默认有300个实例的个数限制,如果Worker实例个数超300,请在钉钉群(群号:32874633)联系相关技术人员为您的账号提升实例个数上限。

  • Airflow DAG实例计费说明

    sae-airflow-schedule应用中,由DAG任务触发创建的实例,会按照Job实例进行秒级计费。具体计费信息,请参见计费概述

1. 创建数据库

本文以MySQL数据库为例进行演示。本文只介绍关键步骤,创建数据库的具体介绍,请参见创建RDS MySQL实例

如果您需要创建PostgreSQL数据库,请参见创建PostgreSQL实例

1.1 创建MySQL实例

登录云数据库 RDS 控制台,在左侧导航栏选择实例列表,单击创建实例,然后配置以下参数。

  1. 选择创建方式,支持标准创建快捷创建,本文以标准创建为例进行演示。

  2. 选择付费方式,选择与SAE侧部署Airflow应用相同的地域,选择引擎MySQL并选择符合要求的版本。

    hsgSg9jRvN

  3. 选择VPC,使用与部署Airflow应用所在命名空间相同的VPC。

  4. 选择主可用区及网络备用可用区及网络

  5. 选择实例规格

  6. 单击确定

1.2 修改参数

说明

本步骤只适用于MySQL数据库,如果您选择的是PostgreSQL数据库,请跳过此步骤。

  1. 实例列表页面,单击已创建的实例ID,进入实例的基本信息页面。

  2. 在左侧导航栏单击参数设置,在可修改参数页签,找到sql_mode参数。

  3. 单击sql_mode参数运行参数值列的image图标,删除NO_ZERO_DATE参数值,单击确定,然后单击提交参数

    说明

    在严格模式下,MySQL数据库不允许0000-00-00作为有效时间。在某些情况下,会有类似“Invaliddefaultvaluefor‘end_date'”的报错,该报错表示某些Airflow表使用0000-00-00 00:00:00作为时间戳字段的默认值,在MySQL实例上禁用NO_ZERO_DATE参数值后,可避免此错误。

    1lrDBVhDai

1.3 创建账号及数据库

本文通过控制台的方式为例进行演示说明。

通过控制台创建

创建账号

说明

支持创建以下两种账号,创建账号的具体信息,请参见创建账号

  • 高权限账号:一个实例中只能创建一个高权限账号,此账号可以管理所有普通账号和数据库。本文以创建该账号为例进行演示。

  • 普通账号:一个实例可以创建多个,该账号默认仅拥有登录数据库的权限,如果您创建的该类型的账号,需要为账号赋予管理数据库的读写权限。更多信息,请参见修改账号权限

  1. 进入实例后,在左侧导航栏单击账号管理

  2. 用户账号页面单击创建账号,在弹出的创建账号面板中配置以下参数,然后单击确定

    1. 自定义数据库账号,本示例中为airflow_user

    2. 选择账号类型。

      高权限账号:一个实例只能创建一个,该账号具体管理普通账号和所有数据库的权限。
      普通账号:需要选择需要授权的数据库并设置权限(读写权限:DDL+DML、只读权限:仅DDL或仅DML)。
    3. 设置账号密码。

    allBXUfTK5

创建数据库

  1. 进入实例后,在左侧导航栏单击数据库管理

  2. 单击创建数据库,在弹出的创建数据库面板,自定义数据库(DB)名称(如:airflow_db),选择支持字符集(如:utf8),然后单击创建

    image

通过SQL语句创建

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
# 创建airflow_db数据库

CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
# 创建登录数据库的账号

GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';
# 授予airflow_user用户管理airflow_db数据库的权限

1.4 设置数据库白名单

  1. 进入实例后,在左侧导航栏单击白名单与安全组

  2. 在名单设置页签,单击添加白名单分组

  3. 在弹出的添加白名单分组面板,设置分组名称组内白名单,然后单击确定

    说明
    • 如果需要添加多个网段,需要使用半角逗号(,)进行分隔。

    • 新白名单将于1分钟后生效。

    • 分组名称可自定义。

    • 组内白名单为SAE侧部署Airflow应用对应命名空间的VPC可交换机的网段。

      • 查看VPC网段:SAE侧,进入部署Airflow应用的目标命名空间,单击专有网络VPC名称,然后在目标VPC的基本信息页签复制IPv4网段。

        g0Qmg5Dpbk

      • 查看交换机网段:专有网络控制台,单击左侧导航栏的交换机,在交换机页面找到对应的交换机并单击实例ID,然后复制该交换机的IPv4网段。

        bA7mkN341c

1.5 获取数据库连接地址

进入实例后,在左侧导航栏单击数据库连接,然后复制该页面的内网地址内网端口

创建Airflow应用时,需要填写数据库连接地址。

fULtxrtyFE

2. 创建存储空间(Bucket)

2.1 创建Bucket

创建存储空间的方式有多种,本文以通过控制台创建为例进行演示。具体信息,请参见创建存储空间

  1. 登录对象存储OSS控制台,在左侧导航栏单击Bucket列表

  2. 单击创建Bucket,在弹出的创建Bucket面板配置以下参数,然后单击完成创建

    1. 选择创建模式。

    2. 自定义Bucket名称,并选择目标地域。

      选择的地域与创建Airflow应用的地域一致。
    3. 选择所需资源组。

2.2 创建挂载目录

  1. Bucket列表页面,单击上一步骤创建的Bucket名称。

  2. 在目标Bucket文件列表页面,单击新建目录

  3. 在弹出的新建目录页签,输入目录名,然后单击确定

    说明

    目录命名规范:

    • 不允许使用表情符,请使用符合要求的 UTF-8 字符。

    • /用于分割路径,可快速创建子目录,但不要以/\开头,不要出现连续的/

    • 不允许出现名为..的子目录。

    • 总长度控制在1~254 个字符。

3. 创建Airflow应用

3.1 配置网络

  1. 登录SAE控制台,在左侧导航栏单击应用中心,选择目标地域。

    部署Airflow应用的地域和数据库所在的地域需要一致。
  2. 然后将鼠标悬停到Airflow区域,单击部署

  3. Airflow页面,选择命名空间交换机选择安全组

    为了保证服务的高可用,建议至少选择两个交换机。需要将选择的交换机的网段都添加到数据库的白名单中。

    4EXlPPihTV

3.2 配置MySQL数据库

Airflow页面,单击数据库区块,在数据库配置面板配置以下信息,然后单击确定dSg1zWz8G0

  1. 数据库名称:填写已创建的数据库名称

  2. 数据库地址:填写已复制的内网连接地址

  3. 端口:填写数据库的端口,默认为3306,如果您自定义了数据库端口,请填写自定义的端口。

  4. 用户名:填写用于登录数据库的账号名称

  5. 密码:填写用于登录数据库的账号密码。

3.3 配置文件存储

Airflow页面,单击文件存储区块,在文件存储配置面板配置以下信息,然后单击确定

  1. 填写已创建的AccessKey IDAccessKey Secret

    建议您遵循阿里云安全最佳实践,使用RAM子用户的AccessKey,并设置RAM最小访问权限。具体操作,请参见创建AccessKey
  2. 挂载OSS。

    1. Bucket:在下拉列表中选择已创建的Bucket

    2. 挂载路径:填写已创建的挂载目录,如test/

      如果OSS挂载目录不存在,会触发异常。
    3. 容器路径:填写/opt/airflow/dags

3.4 执行流水线部署

数据库和文件存储配置完成后,在Airflow页面单击部署

  • 执行流水线部署后,会创建sae-airflow-schedulersae-airflow-webserver两个应用,此两个应用可在应用列表页面查看和管理。

    iOzMLvywIm

  • 流水线部署过程中,会实时打印部署流水线的日志,包括Secret数据库连接Deployment应用发布Service公网访问发布过程,如果流水线发布失败,您可以借助发布日志排查问题。

  • 流水线部署完成后,会在Service公网访问发布页签打印sae-airflow-webserver应用的CLB IP和端口,该地址可直接在浏览器访问。

    JdGDi9k2tI

4. 初步体验运行DAG任务

4.1 创建Airflow用户名和密码

流水线发布成功后,需要创建登录Airflow Web界面的用户名和密码。

  1. 应用列表页面,进入sae-airflow-schedulersae-airflow-webserver应用的基础信息页面。

  2. 基础信息页面,单击实例列表页签,使用Websell登录应用实例,然后创建用户名和密码。

    airflow users create \
       --username airflow \
       # 自定义用户名,本示例中为airflow
       --password airflow \
       # 自定义密码,本示例中为airflow
       --role Admin \
       # 定义用户角色,Admin为最高权限,可管理所有DAG和用户
       --firstname airflow \
       --lastname airflow \
       --email spider***@superhero.org 
       # 定义用户邮箱,用于接收通知,如任务失败告警

4.2 登录Airflow Web界面

复制在流水线中生成的公网IP,在浏览器在进行访问,然后输入用户名和密码进行登录。

格式为http://<sae-airflow-webserver CLB IP>:8080

D6akwpXu6v

4.3 体验运行DAG任务

  1. 登录OSS管理控制台,在已创建的Bucket中上传示例DAG文件,以便调度器识别并运行任务。

    上传文件的具体操作,请参见上传文件
    示例文件:dag.py

    DAG示例文件内容

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime, timedelta
    
    def my_python_function():
        print("Hello from Python!")
    
    
    def my_python_function1():
        print("Hello from airflow!")
    
    # 创建 DAG 对象
    with DAG(
        "example_dag",
        schedule_interval='*/1 * * * *',  # 每分钟执行一次
        start_date=datetime(2022, 1, 1),
        catchup=False,
    ) as dag:
        task1 = PythonOperator(
            task_id="extract_data",
            python_callable=my_python_function1,
            dag=dag,
        )
        task2 = PythonOperator(
            task_id="my_python_task",
            python_callable=my_python_function,
            dag=dag,
        )
        task1 >> task2  # 定义任务的依赖关系
  2. 进入Airflow Web界面,在DAGs页面,单击目标DAG名称。

    7tSGohJ2El

  3. 在目标DAG详情页面,单击Graph页签。

    下图显示的是两个DAG对象,分别对应的是task1task2。由示例文件可知,task2依赖于task1,即task1运行成功完成后才会执行task2。

    KZlztLq70u

  4. 单击Graph页签中的print_date,运行DAG任务,查看是否能够运行成功。

    运行成功后,DAG对象会显示success。

    3iVkNyjIqd

  5. DAG对象运行过程中,会在sae-airflow-scheduler应用中创建对应的实例,DAG对象运行成功后,对应的实例会自动删除。

    说明
    • Airflow Worker实例归属于airflow-scheduler组件,可以在sae-airflow-scheduler应用的实例列表页面查看。

    • 如果有Pod泄露,您可以直接在sae-airflow-scheduler应用的实例列表页面查看与删除。

    image

部署过程中常见的问题

  • 挂载OSS失败如何解决?

    使用过程中,遇到OSS挂载失败、容器中挂载路径不存在、或缺少操作权限等问题,请参考以下步骤排查故障。

    1. 确认配置的OSS Bucket是否存在。

      1. 如果是通过控制台部署应用,只有同账号同地域下实际存在的OSS Bucket才能在SAE控制台选取。

      2. 如果是通过除控制台外的方式(例如API、SDK、saectl工具、Jenkins插件)部署应用,需要登录OSS控制台并确认同账号同地域下配置的 OSS Bucket 名称是存在的。

    2. 检查挂载时配置的AccessKey ID、AccessKey Secret所对应的RAM用户的操作权限。

      1. 根据配置的AccessKey ID、AccessKey Secret找到关联的 RAM 用户。

      2. 确认该 RAM 用户针对配置的 OSS Bucket 有必要的操作权限

    3. 检查OSS Bucket授权策略。

      1. 登录OSS控制台,进入目标Bucket的详情页面,在左侧导航栏选择权限控制 > Bucket授权策略

      2. 检查Bucket授权策略是否拦截了SAE的访问,具体来说,就是要确认相应地域下SAE的公网地址已被添加到放行的白名单中,如下表所示。

        说明

        如需获取SAE详细公网地址,请在钉钉群(群号:32874633)联系相关技术人员。

        Region

        IP 地址

        cn-hangzhou

        47.99.xx.xx

        cn-shanghai

        47.101.xx.xx

        cn-beijing

        47.94.xx.xx

        cn-zhangjiakou

        121.89.xx.xx

        cn-wulanchabu

        8.130.xx.xx

        cn-shenzhen

        39.108.xx.xx

        cn-heyuan

        47.121.xx.xx

        cn-guangzhou

        8.134.xx.xx

        8.134.xx.xx

        cn-chengdu

        47.108.xx.xx

        cn-hongkong

        47.243.xx.xx

        8.210.xx.xx

        ap-southeast-1

        8.219.xx.xx

        eu-central-1

        8.211.xx.xx

        8.211.xx.xx

        us-west-1

        47.89.xx.xx

        us-east-1

        47.252.xx.xx

  • 连接MySQL失败如何解决?

    根据流水线日志或sae-airflow-scheduler应用中实时日志的报错信息,进行配查,具体解决方案,请参见解决无法连接实例问题