云原生数据仓库AnalyticDB MySQL版新增了Airflow集群,本文介绍了如何购买及管理Airflow集群。

Airflow介绍

注意 功能正在灰度测试中,如需要体验,请提工单联系技术支持人员。
Airflow是Airbnb开源的一个用Python编写的调度工具。于2014年启动,2015年春季开源,2016年加入Apache软件基金会的孵化计划。本身作为一款开源的,分布式任务调度框架,它将一个具有上下级依赖关系的工作流,组装成一个有向无环图(即:DAG图),进行复杂的任务工作流调度。基本概念涉及:
  • DAG

    DAG意为有向无循环图,在Airflow中则定义了整个完整的作业。同一个DAG中的所有Task拥有相同的调度时间。

  • Task

    Task为DAG中具体的作业任务,它必须存在于某一个DAG之中。Task在DAG中配置依赖关系,跨DAG的依赖是可行的,但是并不推荐。跨DAG依赖会导致DAG图的直观性降低,并给依赖管理带来麻烦。

  • DAG Run

    当一个DAG满足它的调度时间,或者被外部触发时,就会产生一个DAG Run。可以理解为由DAG实例化的实例。

  • Task Instance

    当一个Task被调度启动时,就会产生一个Task Instance。可以理解为由Task实例化的实例。

目前AnalyticDB MySQL版 3.0采用的是Airflow 1.10.12版本,详细内容请参见https://airflow.apache.org/docs/apache-airflow/1.10.12/index.html

开通Airflow集群

  1. 使用阿里云账号登录AnalyticDB MySQL版控制台
  2. 单击待开通Airflow集群的集群名称。
  3. 单击Airflow管理 > 数据库账号管理
  4. 单击创建,开始创建Airflow集群。
    说明 集群创建时间大约需要10分钟,在开通后集群还在创建的过程中,Spark和Airflow相关功能都处于禁用状态。

数据库账号管理

绑定数据库账号是运行DAG任务的第一步,配置合法的集群账号密码后才能成功执行DAG任务。可以绑定集群中多个已经创建的账号(包含高权限账号、普通账号),主要用于在Airflow DAG脚本中填写该用户名,然后在Spark任务执行时校验提交该任务的用户合法性。如果填写的账号不存在或者账号密码信息不匹配,则不能成功执行DAG任务。

  1. 在AnalyticDB MySQL版控制台,单击已开通Airflow集群的集群名称。
  2. 单击Airflow管理 > 数据库账号管理
  3. 单击绑定数据库账号,输入待绑定的数据库账号和密码,单击确定
  4. 绑定成功的数据库账号,会展示在界面上。单击测试连接,测试数据库账号是否能连接成功。

    绑定成功的数据库账号还可以进行修改删除

DAG任务管理

创建DAG可以允许用户利用python编写脚本进行任务调度的编排,控制台默认提供了一个基础的脚本模板,方便用户编写脚本。而且基于AnalyticDB MySQL版封装过的 SparkJobOperator,可以无缝衔接AnalyticDB MySQL版 3.0内部的Spark集群,进而快速达到对数据的离线处理。

创建DAG
AnalyticDB MySQL版默认提供了一个基础的脚本模板,用户可以基于该模板进行python编写。鉴于python对代码格式的要求,最好在专业的python IDE软件里编写正确之后再进行任务提交。
  1. AnalyticDB MySQL版控制台,单击已开通Airflow集群的集群名称。
  2. 单击Airflow管理 > DAG任务管理
  3. 单击创建DAG,根据控制台默认提供的基础的脚本模板,您可以编写python脚本,进行任务调度。
    说明 Airflow集群中的dag_id参数需要唯一,不允许多个DAG使用同一个dag_id。
    示例如下:
    from datetime import datetime, timedelta
    from airflow import DAG
    from custom_operator.spark_operator import SparkJobOperator
    
    """
    Airflow Notes:
    1.Airflow 1.10.12 Version Official Doc: https://airflow.apache.org/docs/apache-airflow/1.10.12/index.html
    2.Don't delete the imported SparkJobOperator, and other airflow operators are not allowed;
    3.Due to the python formatter requirements, you'd better input this python code in the professional python IDE, and then paste the code here. Otherwise this code will not be resolved correctly by the python interpreter;
    """
    
    """
    DAG Notes:
    1.The dag_id is required, and should be unique for the airflow cluster;
    2.If you want the tasks to be re-run in regular , schedule_interval is needed;
    """
    default_args = {
        'owner': 'xxxx',
        'start_date': datetime(2021, 1, 21),
        'retries': 1,
        'retry_delay': timedelta(minutes=10),
    }
    dag = DAG(dag_id='dag_id_xxxx', default_args=default_args, schedule_interval=timedelta(minutes=10))
    
    # Task Definition
    spark_submit_task = SparkJobOperator(
        task_id='task_xxx',
        name='',
        file='',
        class_name='',
        num_executor=1,
        args=[''],
        conf={
            'spark.adb.userName': ''
        },
        dag=dag
    )
    
    # Run Task
    spark_submit_task
  4. 单击确定,完成DAG任务的创建。
DAG列表
创建完成的DAG任务,会显示在DAG任务列表中,任务列表包含以下信息:
  • 是否启用
    1. 当开启后,如果是定时调度任务,会按照脚本的间隔时间进行执行。
    2. 当暂停后,则该DAG任务会暂停执行。
  • DAG ID
    该ID来源于脚本中的dag_id参数,在整个Airflow集群需要唯一,否则任务会创建失败。如果不指明dag_id参数名,则来源于DAG的第一个参数,即以下两种都合法:
    1. DAG(dag_id='dag_id_xxxx',
    2. DAG('dag_id_xxxx',
  • DAG任务解析状态

    DAG脚本提交之后,AnalyticDB MySQL版为了方便用户对任务解析过程有感知,二次封装了解析过程,提供以下几个解析状态:

    1. 解析中:Airflow集群默认按照5分钟的进程解析间隔对提交的python文件进行解析,如果在此时间内python为解析中,则此任务不能执行。
    2. 解析失败:当Airflow开始解析DAG python文件时发现错误,则会给出基本的错误信息,方便提交人进行脚本编写确认。
    3. 解析正确:此时DAG文件已经被正常解析,相关脚本的基础参数已经被Airflow入库,进而等待到特定的时间或者操作执行DAG任务。
  • 最近的任务

    最近一次调度DAG时,任务的执行状态和每个状态的结果次数。包含:success 、running、failed、none、upstream_failed、skipped、up_for_retry、up_for_reschedule、queued、scheduled、sensing。

  • 上一次运行时间

    最近一次调度DAG的时间。

  • 所有DAG的运行情况

    汇总该DAG在运行期间的所有执行结果次数:success、running、failed。

  • 详情
    • DAG脚本被提交之后,如果DAG解析正确,则可以查看DAG的任务依赖关系和提交的源码。
    • DAG处于“解析中”或者“解析失败”,则只能查看提交的源码。
  • 修改
    如果DAG解析失败,或者用户想修改已提交的DAG内容,则可以进行修改。由于Airflow把DAG ID作为任务执行的唯一标识,所以在修改的时候要注意:
    • 如果DAG的ID没有修改,而是修改其他内容,则该DAG依旧存在,然后需要等待5分钟以内的时间让Airflow重新解析。
    • 如果DAG的ID被修改,则旧的DAG文件会被删除,进而创建一个以新的DAG ID为唯一标识的python文件,然后再等待5分钟让Airflow重新扫描解析。
  • 删除

    删除以该DAG ID为唯一标识的python文件及其任务meta数据。