Airflow是比较流行的开源调度工具,提供了丰富的命令行工具和简单易用的操作界面,可以实现各类工作负载的DAG编排与调度。您可以使用Airflow完成云原生数据仓库 AnalyticDB MySQL 版离线数据的ETL智能编排和实时数据处理流程的编排,实现数据处理过程的自动化,提高数据处理的效率。
前提条件
集群的产品系列为企业版或湖仓版。
已安装Airflow。具体操作,请参见Airflow社区文档。
已将运行Airflow的服务器IP地址添加至AnalyticDB for MySQL集群的白名单中。具体操作,请参见设置白名单。
操作步骤
查看是否存在apache-airflow-providers-mysql。
访问Airflow Web界面,在顶部导航栏单击 。
在Providers页面,查看是否存在apache-airflow-providers-mysql。
(条件必选)若不存在apache-airflow-providers-mysql,需执行以下命令手动安装apache-airflow-providers-mysql。
pip install apache-airflow-providers-mysql
重要执行该语句后,若出现
OSError: mysql_config not found
报错,需执行yum install mysql-devel
命令安装MySQL,再重新执行该命令安装apache-airflow-providers-mysql。
创建连接。
在顶部导航栏单击 。
单击按钮,在Add Connections页面配置如下参数:
参数
说明
Connection id
连接名称。
Connection Type
选择MySQL。
Host
AnalyticDB for MySQL集群的连接地址。可通过控制台集群信息页面,查看连接信息。
Login
AnalyticDB for MySQL集群的数据库账号。
Password
AnalyticDB for MySQL集群数据库账号的密码。
Port
AnalyticDB for MySQL集群的端口号,固定为3306。
说明其他参数为选填参数,按需配置即可。
进入Airflow安装目录,在
airflow.cfg
文件中查看dags_folder参数。进入Airflow安装目录。
cd /root/airflow
在
airflow.cfg
文件中查看dags_folder参数。cat file.cfg
(条件必选)如果dags_folder参数指定路径下无文件夹,您需执行
mkdir
命令新建文件夹。说明例如,dags_folder参数指定的路径为
/root/airflow/dags
,如果/root/airflow
路径下无dags
文件夹,您可以在该路径下创建dags文件夹。
编写DAG,本文的DAG文件为
mysql_dags.py
。from airflow import DAG from airflow.providers.mysql.operators.mysql import MySqlOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', } dag = DAG( 'example_mysql', default_args=default_args, start_date=days_ago(2), tags=['example'], ) mysql_test = MySqlOperator( task_id='mysql_test', mysql_conn_id='test', sql='SHOW DATABASES;', dag=dag, ) mysql_test_task = MySqlOperator( task_id='mysql_test_task', mysql_conn_id='test', sql='SELECT * FROM test;', dag=dag, ) mysql_test >> mysql_test_task if __name__ == "__main__": dag.cli()
参数说明:
mysql_conn_id
:步骤2中配置的连接名称。sql
:业务具体的SQL语句。
其他参数请参见Airflow社区文档。
在Airflow Web界面,单击对应DAG右侧按钮。
执行成功后,您也可以单击对应DAG右侧的绿色圆圈,查看DAG执行的详细信息。
重要Airflow工具默认使用UTC时区,所以DAG的执行时间显示比北京时间(UTC+8)少8小时。