本文介绍了基于LHM调度迁移工具将Airflow调度任务流迁移到DataWorks Workflow的方案与操作流程,包括Airflow任务导出、调度任务转换、DataWorks任务导入。
一、导出Airflow任务流
导出Airflow调度任务流元信息的基本原理是,利用Airflow的Python库加载用户的DAG Folder,获取DAG及其内部任务信息与依赖关系,整理为JSON文件导出。
迁移工具目前支持对Airflow 2.x的任务流导出。
1 运行环境
迁移工具的Airflow导出能力基于MigrationX Airflow Reader(MigrationX)实现,在Python>= 3.9.0的环境中执行。
推荐的导出工具运行环境(二选一):
1、在Airflow调度所在Python环境中运行(需满足Python >= 3.9.0的条件)
2、准备新的Python环境并安装与生产环境同版本的Airflow Python库,需注意Airflow库与Python版本的对照关系。
方案二中提到的新Python环境可参考下列流程配置(以linux ecs为例):
## 安装conda并创建Python3.9环境
# 下载并安装conda
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh
# 安装过程中conda会要求指定安装目录,安装完成后执行以下命令进行初始化
cd <conda安装目录>
conda init
conda config --set auto_activate_base false
. ~/.bashrc
# 创建Python3.9.0环境
conda create -n airflow python=3.9.0
conda activate airflow
# 在Conda环境中安装airflow(以2.5.3为例)
pip install apache-airflow=2.5.3
2 操作流程
2.1 下载导出工具包airflow-workflow-parser.zip:
2.2 解压airflow-workflow-parser.zip:
tar -zxvf airflow-workflow-parser.zip
2.3 设置PTYHONPATH
若在Airflow环境中运行导出工具,需配置PYTHONPATH;若在Conda环境中运行导出工具,则可跳过此步骤。
指定PYTHONPATH到airflow的python lib目录,例如:
export PYTHONPATH=/usr/local/lib/python3.6/site-packages
2.4 执行airflow任务导出
Parser会获取指定目录下的所有DAG,每个DAG对应生成一个DataWorks Spec文件(DataWorks任务流描述定义文件,json格式)。
DataWorks任务流描述定义 dataworks-spec
https://github.com/aliyun/dataworks-spec/tree/master
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json
参数解释:
-d 表示Airflow DAG文件的存放目录;
-o 表示导出结果JSON文件的存放路径;
-m 表示配置文件路径,可在其中配置节点映射转换规则。具体转换配置项见2.4.1小节。
若执行失败,通常是由于DAG中有特殊依赖,按报错使用pip命令依次安装缺失的依赖即可。
2.4.1 转换配置模板
导出工具支持对节点类型进行转换,可通过配置项将Airflow Operator与DataWorks节点类型映射,其模板如下。
迁移至DataWorks on MaxCompute的配置模板(参考配置):
{
"workflowPathPrefix": "Airflow导入_V3/",
"typeMapping": {
"EmptyOperator": "VIRTUAL",
"DummyOperator": "VIRTUAL",
"ExternalTaskSensor": "VIRTUAL",
"BashOperator": "DIDE_SHELL",
"HiveToMySqlTransfer": "DI",
"PrestoToMySqlTransfer": "DI",
"PythonOperator": "PYTHON",
"HiveOperator": "ODPS_SQL",
"SqoopOperator": "DI",
"SparkSqlOperator": "ODPS_SQL",
"SparkSubmitOperator": "ODPS_SPARK",
"SQLExecuteQueryOperator": "MySQL",
"PostgresOperator": "Postgresql",
"MySqlOperator": "MySQL",
"default": "PYTHON"
},
"settings": {
"workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
}
}
至DataWorks on EMR的配置模板(参考配置):
{
"workflowPathPrefix": "Airflow导入_V3/",
"typeMapping": {
"EmptyOperator": "VIRTUAL",
"DummyOperator": "VIRTUAL",
"ExternalTaskSensor": "VIRTUAL",
"BashOperator": "EMR_SHELL",
"HiveToMySqlTransfer": "DI",
"PrestoToMySqlTransfer": "DI",
"PythonOperator": "PYTHON",
"HiveOperator": "EMR_HIVE",
"SqoopOperator": "EMR_SQOOP",
"SparkSqlOperator": "EMR_SPARK_SQL",
"SparkSubmitOperator": "EMR_SPARK",
"SQLExecuteQueryOperator": "MySQL",
"PostgresOperator": "Postgresql",
"MySqlOperator": "MySQL",
"default": "PYTHON"
},
"settings": {
"workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
}
}
参数说明:
参数名 | 描述 |
workflowPathPrefix | 任务流导入DataWorks后的存储位置。 |
typeMapping | Airflow Operator与DataWorks节点类型的映射规则。 DataWorks节点类型可参考此枚举类:https://github.com/aliyun/dataworks-spec/blob/b0f4a4fd769215d5f81c0bbe990addd7498df5f4/spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/types/CodeProgramType.java#L180 |
workflow.converter.target.schedule.resGroupIdentifier | DataWorks资源组Id |
由DataWorks工作空间详情页左侧菜单栏进入资源组页面,绑定资源组,并获取资源组ID。
2.5 操作示例
下载工具到本地后,解压缩得到如下目录:
.
├── airflow_workflow
│ ├── common
│ ├── connections.py
│ ├── converter
│ ├── dag_parser.py
│ ├── downloader.py
│ ├── get_dags.py
│ ├── __init__.py
│ ├── miscs
│ ├── models
│ ├── patch
│ ├── __pycache__
│ └── test
├── airflow-workflow.tgz
├── dags
│ ├── example.py
│ └── __pycache__
├── flowspec-airflowV2-transformer-config.json
├── parser.py
├── README.MD
└── result
执行命令。
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json
运行日志如下:
查看转换结果:
2.6 常见问题
2.6.1 报错 TIMEZONE = pendulum.tz.timezone("UTC") TypeError: 'module' object is not callable
如果是Airflow 2.x,需确保 pendulum 版本 ≤ 2.1.2。
如果是Airflow 3.x,则无需 pendulum(Airflow 3.x 已移除对 pendulum 的依赖)。
解决办法:
pip uninstall pendulum -y
pip install pendulum==2.1.2
2.6.2 dag文件解析依赖报错
如果解析失败, 一般是dag中有相对应特殊依赖,按照报错使用pip依此安装缺失的依赖即可。
二、导入DataWorks
Airflow导出工具将输出调度任务流的描述文件,JSON格式,其数据结构符合DataWorks Spec规范。
用户可参考如下文档完成DataWorks导入:自定义DataWorks Spec导入DataWorks