Airflow调度XIHE SQL

更新时间:

Airflow是比较流行的开源调度工具,提供了丰富的命令行工具和简单易用的操作界面,可以实现各类工作负载的DAG编排与调度。您可以使用Airflow完成云原生数据仓库 AnalyticDB MySQL 版离线数据的ETL智能编排和实时数据处理流程的编排,实现数据处理过程的自动化,提高数据处理的效率。

前提条件

  • 集群的产品系列为企业版或湖仓版

  • 已安装Airflow。具体操作,请参见Airflow社区文档

  • 已将运行Airflow的服务器IP地址添加至AnalyticDB for MySQL集群的白名单中。具体操作,请参见设置白名单

操作步骤

  1. 查看是否存在apache-airflow-providers-mysql

    1. 访问Airflow Web界面,在顶部导航栏单击Admin > Providers

    2. Providers页面,查看是否存在apache-airflow-providers-mysql

    3. (条件必选)若不存在apache-airflow-providers-mysql,需执行以下命令手动安装apache-airflow-providers-mysql

      pip install apache-airflow-providers-mysql
      重要

      执行该语句后,若出现OSError: mysql_config not found报错,需执行yum install mysql-devel命令安装MySQL,再重新执行该命令安装apache-airflow-providers-mysql

  2. 创建连接。

    1. 在顶部导航栏单击Admin > Connections

    2. 单击image按钮,在Add Connections页面配置如下参数:

      参数

      说明

      Connection id

      连接名称。

      Connection Type

      选择MySQL

      Host

      AnalyticDB for MySQL集群的连接地址。可通过控制台集群信息页面,查看连接信息。

      Login

      AnalyticDB for MySQL集群的数据库账号。

      Password

      AnalyticDB for MySQL集群数据库账号的密码。

      Port

      AnalyticDB for MySQL集群的端口号,固定为3306。

      说明

      其他参数为选填参数,按需配置即可

  3. 进入Airflow安装目录,在airflow.cfg文件中查看dags_folder参数。

    1. 进入Airflow安装目录。

      cd /root/airflow
    2. airflow.cfg文件中查看dags_folder参数。

      cat file.cfg
    3. (条件必选)如果dags_folder参数指定路径下无文件夹,您需执行mkdir命令新建文件夹。

      说明

      例如,dags_folder参数指定的路径为/root/airflow/dags,如果/root/airflow路径下无dags文件夹,您可以在该路径下创建dags文件夹。

  4. 编写DAG,本文的DAG文件为mysql_dags.py

    from airflow import DAG
    from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
        'owner': 'airflow',
    }
    
    dag = DAG(
        'example_mysql',
        default_args=default_args,
        start_date=days_ago(2),
        tags=['example'],
    )
    
    mysql_test = MySqlOperator(
        task_id='mysql_test',
        mysql_conn_id='test',
        sql='SHOW DATABASES;',
        dag=dag,
    )
    mysql_test_task = MySqlOperator(
        task_id='mysql_test_task',
        mysql_conn_id='test',
        sql='SELECT * FROM test;',
        dag=dag,
    )
    mysql_test >> mysql_test_task
    if __name__ == "__main__":
        dag.cli()
    

    参数说明:

    • mysql_conn_id:步骤2中配置的连接名称。

    • sql:业务具体的SQL语句。

    其他参数请参见Airflow社区文档

  5. Airflow Web界面,单击对应DAG右侧image按钮

    执行成功后,您也可以单击对应DAG右侧的绿色圆圈,查看DAG执行的详细信息。

    image

    image

    重要

    Airflow工具默认使用UTC时区,所以DAG的执行时间显示比北京时间(UTC+8)少8小时。