Airflow -> DataWorks

本文介绍了基于LHM调度迁移工具将Airflow调度任务流迁移到DataWorks Workflow的方案与操作流程,包括Airflow任务导出、调度任务转换、DataWorks任务导入。

一、导出Airflow任务流

导出Airflow调度任务流元信息的基本原理是,利用AirflowPython库加载用户的DAG Folder,获取DAG及其内部任务信息与依赖关系,整理为JSON文件导出。

迁移工具目前支持对Airflow 2.x的任务流导出。

1 运行环境

迁移工具的Airflow导出能力基于MigrationX Airflow Reader(MigrationX)实现,在Python>= 3.9.0的环境中执行。

推荐的导出工具运行环境(二选一):

1、在Airflow调度所在Python环境中运行(需满足Python >= 3.9.0的条件)

2、准备新的Python环境并安装与生产环境同版本的Airflow Python库,需注意Airflow库与Python版本的对照关系。

方案二中提到的新Python环境可参考下列流程配置(以linux ecs为例):

## 安装conda并创建Python3.9环境

# 下载并安装conda
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh

# 安装过程中conda会要求指定安装目录,安装完成后执行以下命令进行初始化
cd <conda安装目录>
conda init
conda config --set auto_activate_base false
. ~/.bashrc

# 创建Python3.9.0环境
conda create -n airflow python=3.9.0
conda activate airflow

# 在Conda环境中安装airflow(以2.5.3为例)
pip install apache-airflow=2.5.3

2 操作流程

2.1 下载导出工具包airflow-workflow-parser.zip:

下载链接:airflow-workflow-parser

2.2 解压airflow-workflow-parser.zip:

tar -zxvf airflow-workflow-parser.zip

2.3 设置PTYHONPATH

若在Airflow环境中运行导出工具,需配置PYTHONPATH;若在Conda环境中运行导出工具,则可跳过此步骤。

指定PYTHONPATHairflowpython lib目录,例如:

export PYTHONPATH=/usr/local/lib/python3.6/site-packages

2.4 执行airflow任务导出

Parser会获取指定目录下的所有DAG,每个DAG对应生成一个DataWorks Spec文件(DataWorks任务流描述定义文件,json格式)。

DataWorks任务流描述定义 dataworks-spec
https://github.com/aliyun/dataworks-spec/tree/master
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json

参数解释:

-d 表示Airflow DAG文件的存放目录;

-o 表示导出结果JSON文件的存放路径;

-m 表示配置文件路径,可在其中配置节点映射转换规则。具体转换配置项见2.4.1小节。

若执行失败,通常是由于DAG中有特殊依赖,按报错使用pip命令依次安装缺失的依赖即可。

2.4.1 转换配置模板

导出工具支持对节点类型进行转换,可通过配置项将Airflow OperatorDataWorks节点类型映射,其模板如下。

迁移至DataWorks on MaxCompute的配置模板(参考配置):

{
  "workflowPathPrefix": "Airflow导入_V3/",
  "typeMapping": {
    "EmptyOperator": "VIRTUAL",
    "DummyOperator": "VIRTUAL",
    "ExternalTaskSensor": "VIRTUAL",
    "BashOperator": "DIDE_SHELL",
    "HiveToMySqlTransfer": "DI",
    "PrestoToMySqlTransfer": "DI",
    "PythonOperator": "PYTHON",
    "HiveOperator": "ODPS_SQL",
    "SqoopOperator": "DI",
    "SparkSqlOperator": "ODPS_SQL",
    "SparkSubmitOperator": "ODPS_SPARK",
    "SQLExecuteQueryOperator": "MySQL",
    "PostgresOperator": "Postgresql",
    "MySqlOperator": "MySQL",
    "default": "PYTHON"
  },
  "settings": {
    "workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
  }
}

DataWorks on EMR的配置模板(参考配置):

{
  "workflowPathPrefix": "Airflow导入_V3/",
  "typeMapping": {
    "EmptyOperator": "VIRTUAL",
    "DummyOperator": "VIRTUAL",
    "ExternalTaskSensor": "VIRTUAL",
    "BashOperator": "EMR_SHELL",
    "HiveToMySqlTransfer": "DI",
    "PrestoToMySqlTransfer": "DI",
    "PythonOperator": "PYTHON",
    "HiveOperator": "EMR_HIVE",
    "SqoopOperator": "EMR_SQOOP",
    "SparkSqlOperator": "EMR_SPARK_SQL",
    "SparkSubmitOperator": "EMR_SPARK",
    "SQLExecuteQueryOperator": "MySQL",
    "PostgresOperator": "Postgresql",
    "MySqlOperator": "MySQL",
    "default": "PYTHON"
  },
  "settings": {
    "workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
  }
}

参数说明:

参数名

描述

workflowPathPrefix

任务流导入DataWorks后的存储位置。

typeMapping

Airflow OperatorDataWorks节点类型的映射规则。

DataWorks节点类型可参考此枚举类:https://github.com/aliyun/dataworks-spec/blob/b0f4a4fd769215d5f81c0bbe990addd7498df5f4/spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/types/CodeProgramType.java#L180

workflow.converter.target.schedule.resGroupIdentifier

DataWorks资源组Id

DataWorks工作空间详情页左侧菜单栏进入资源组页面,绑定资源组,并获取资源组ID。

image.png

2.5 操作示例

下载工具到本地后,解压缩得到如下目录:

.
├── airflow_workflow
│   ├── common
│   ├── connections.py
│   ├── converter
│   ├── dag_parser.py
│   ├── downloader.py
│   ├── get_dags.py
│   ├── __init__.py
│   ├── miscs
│   ├── models
│   ├── patch
│   ├── __pycache__
│   └── test
├── airflow-workflow.tgz
├── dags
│   ├── example.py
│   └── __pycache__
├── flowspec-airflowV2-transformer-config.json
├── parser.py
├── README.MD
└── result

执行命令。

python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json

运行日志如下:

image.png

查看转换结果:

image.png

image.png

2.6 常见问题

2.6.1 报错 TIMEZONE = pendulum.tz.timezone("UTC") TypeError: 'module' object is not callable

如果是Airflow 2.x,需确保 pendulum 版本 ≤ 2.1.2。

如果是Airflow 3.x,则无需 pendulum(Airflow 3.x 已移除对 pendulum 的依赖)。

解决办法:

pip uninstall pendulum -y
pip install pendulum==2.1.2
2.6.2 dag文件解析依赖报错

如果解析失败, 一般是dag中有相对应特殊依赖,按照报错使用pip依此安装缺失的依赖即可。

二、导入DataWorks

Airflow导出工具将输出调度任务流的描述文件,JSON格式,其数据结构符合DataWorks Spec规范。

用户可参考如下文档完成DataWorks导入:自定义DataWorks Spec导入DataWorks