Airflow迁移至DataWorks

更新时间:
复制为 MD 格式

本文介绍了基于LHM调度迁移工具将Airflow调度任务流迁移到DataWorks Workflow的方案与操作流程,包括Airflow任务导出、调度任务转换、DataWorks任务导入。

一、导出Airflow任务流

导出Airflow调度任务流元信息的基本原理是,利用AirflowPython库加载用户的DAG Folder,获取DAG及其内部任务信息与依赖关系,整理为JSON文件导出。

迁移工具目前支持对Airflow 2.x的任务流导出。

1 运行环境

迁移工具的Airflow导出能力基于MigrationX Airflow Reader(MigrationX)实现,在Python>= 3.9.0的环境中执行。

推荐的导出工具运行环境(二选一):

方案一: 在Airflow调度所在Python环境中运行(需满足Python >= 3.9.0的条件)。

方案二:准备新的Python环境并安装与生产环境同版本的Airflow Python库,需注意Airflow库与Python版本的对照关系。

方案二中提到的新Python环境可参考下列流程配置(以linux ecs为例):

## 安装conda并创建Python3.9环境
# 下载并安装conda
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh
# 安装过程中conda会要求指定安装目录,安装完成后执行以下命令进行初始化
cd <conda安装目录>
conda init
conda config --set auto_activate_base false
. ~/.bashrc
# 创建Python3.9.0环境
conda create -n airflow python=3.9.0
conda activate airflow
# 在Conda环境中安装airflow(以2.5.3为例)
pip install apache-airflow=2.5.3

2 操作流程

2.1 下载导出工具包airflow-workflow-parser.zip:

下载链接:

2.2 解压airflow-workflow-parser.zip:

tar -zxvf airflow-workflow-parser.zip

2.3 设置PTYHONPATH

若在Airflow环境中运行导出工具,需配置PYTHONPATH;若在Conda环境中运行导出工具,则可跳过此步骤。

指定PYTHONPATHairflowpython lib目录,例如:

export PYTHONPATH=/usr/local/lib/python3.6/site-packages

2.4 执行airflow任务导出

Parser会获取指定目录下的所有DAG,每个DAG对应生成一个DataWorks Spec文件(DataWorks任务流描述定义文件,json格式)。

DataWorks任务流描述定义 dataworks-spec
https://github.com/aliyun/dataworks-spec/tree/master
python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json

参数解释:

-d 表示Airflow DAG文件的存放目录;

-o 表示导出结果JSON文件的存放路径;

-m 表示配置文件路径,可在其中配置节点映射转换规则。具体转换配置项见2.4.1小节。

若执行失败,通常是由于DAG中有特殊依赖,按报错使用pip命令依次安装缺失的依赖即可。

2.4.1 转换配置模板

导出工具支持对节点类型进行转换,可通过配置项将Airflow OperatorDataWorks节点类型映射,其模板如下。

迁移至DataWorks on MaxCompute的配置模板(参考配置):

{
  "workflowPathPrefix": "Airflow导入_V3/",
  "typeMapping": {
    "EmptyOperator": "VIRTUAL",
    "DummyOperator": "VIRTUAL",
    "ExternalTaskSensor": "VIRTUAL",
    "BashOperator": "DIDE_SHELL",
    "HiveToMySqlTransfer": "DI",
    "PrestoToMySqlTransfer": "DI",
    "PythonOperator": "PYTHON",
    "HiveOperator": "ODPS_SQL",
    "SqoopOperator": "DI",
    "SparkSqlOperator": "ODPS_SQL",
    "SparkSubmitOperator": "ODPS_SPARK",
    "SQLExecuteQueryOperator": "MySQL",
    "PostgresOperator": "Postgresql",
    "MySqlOperator": "MySQL",
    "default": "PYTHON"
  },
  "settings": {
    "workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
  }
}

DataWorks on EMR的配置模板(参考配置):

{
  "workflowPathPrefix": "Airflow导入_V3/",
  "typeMapping": {
    "EmptyOperator": "VIRTUAL",
    "DummyOperator": "VIRTUAL",
    "ExternalTaskSensor": "VIRTUAL",
    "BashOperator": "EMR_SHELL",
    "HiveToMySqlTransfer": "DI",
    "PrestoToMySqlTransfer": "DI",
    "PythonOperator": "PYTHON",
    "HiveOperator": "EMR_HIVE",
    "SqoopOperator": "EMR_SQOOP",
    "SparkSqlOperator": "EMR_SPARK_SQL",
    "SparkSubmitOperator": "EMR_SPARK",
    "SQLExecuteQueryOperator": "MySQL",
    "PostgresOperator": "Postgresql",
    "MySqlOperator": "MySQL",
    "default": "PYTHON"
  },
  "settings": {
    "workflow.converter.target.schedule.resGroupIdentifier": "Serverless_res_group_651147510078336_710919704558240"
  }
}

参数说明:

参数名

描述

workflowPathPrefix

任务流导入DataWorks后的存储位置。

typeMapping

Airflow OperatorDataWorks节点类型的映射规则。

DataWorks节点类型可参考此枚举类:https://github.com/aliyun/dataworks-spec/blob/b0f4a4fd769215d5f81c0bbe990addd7498df5f4/spec/src/main/java/com/aliyun/dataworks/common/spec/domain/dw/types/CodeProgramType.java#L180

workflow.converter.target.schedule.resGroupIdentifier

DataWorks资源组Id

DataWorks工作空间详情页左侧菜单栏进入资源组页面,绑定资源组,并在资源组列表中找到目标资源组,复制其对应的资源组ID列的值。

2.5 操作示例

下载工具到本地后,解压缩得到如下目录:

.
├── airflow_workflow
│   ├── common
│   ├── connections.py
│   ├── converter
│   ├── dag_parser.py
│   ├── downloader.py
│   ├── get_dags.py
│   ├── __init__.py
│   ├── miscs
│   ├── models
│   ├── patch
│   ├── __pycache__
│   └── test
├── airflow-workflow.tgz
├── dags
│   ├── example.py
│   └── __pycache__
├── flowspec-airflowV2-transformer-config.json
├── parser.py
├── README.MD
└── result

执行命令。

python ./parser.py -d /root/airflow/airflow-workflow/dags -o ./result -m ./flowspec-airflowV2-transformer-config.json

运行日志如下:

2025-05-13 15:27:05,740] {parser.py:148} INFO - Overwriting type mapping with file: ./flowspec-airflowV2-transformer-config.json
2025-05-13 15:27:05,741] {parser.py:157} INFO - Effective type mappings are:
        EmptyOperator: VIRTUAL
        DummyOperator: VIRTUAL
        ExternalTaskSensor: VIRTUAL
        BashOperator: ODPS_SQL
        HiveToMySqlTransfer: DI
        PrestoToMySqlTransfer: DI
        PythonOperator: PYTHON
        BranchPythonOperator: CONTROLLER_BRANCH
        HiveOperator: EMR_HIVE
        SqoopOperator: EMR_SQOOP
        SparkSqlOperator: EMR_SPARK_SQL
        SparkSubmitOperator: EMR_SPARK
        SQLExecuteQueryOperator: MySQL
        PostgresOperator: Postgresql
        MySqlOperator: MySQL
        default: PYTHON
2025-05-13 15:27:05,741] {parser.py:160} INFO - Also setting workflow path prefix to Airflow导入_V3/
2025-05-13 15:27:05,741] {parser.py:164} INFO - Also overwriting settings with {'workflow.converter.target.schedule.resGroupIdentifier': 'Serverless_res_group_651147510078336_710919704558240'}
2025-05-13 15:27:05,741] {parser.py:169} INFO - Effective workflow path prefix is: Airflow导入_V3/
2025-05-13 15:27:05,742] {parser.py:48} INFO - conf: /root/airflow
2025-05-13 15:27:05,742] {dagbag.py:500} INFO - Filling up the DagBag from /root/airflow/airflow-workflow/dags
2025-05-13 15:27:05,751] {dag_parser.py:13} INFO - Init DagBag success
2025-05-13 15:27:05,753] {dag_converter.py:27} INFO - Converting DAG tutorial to DataWorks workflow tutorial
/root/airflow/airflow-workflow/airflow_workflow/converter/task_converter.py:37 DeprecationWarning: DAG.full_filepath is deprecated in favour of fileloc
2025-05-13 15:27:05,755] {dag_converter.py:62} INFO - Finished converting DAG tutorial
2025-05-13 15:27:05,756] {parser.py:187} INFO - Finished parsing Airflow DAGs.

查看转换结果:

(airflow) [root@iZbp13j3jwx9rjos76cb6tZ airflow-workflow]# ls result/
tutorial.json
(airflow) [root@iZbp13j3jwx9rjos76cb6tZ airflow-workflow]# cat result/tutorial.json
{
  "version": "1.1.0",
  "kind": "CycleWorkflow",
  "spec": {
    "nodes": [],
    "workflows": [
      {
        "id": "484353000179523",
        "outputs": {
          "nodeOutputs": [
            {
              "artifactType": "NodeOutput",
              "data": "484353000179523",
              "refTableName": "tutorial"
            }
          ]
        },
        "nodes": [
          {
            "id": "2263957773817806",
            "name": "print_date",
            ...
            ...
          }
        ]
      }
    ]
  }
}

2.6 常见问题

2.6.1 报错 TIMEZONE = pendulum.tz.timezone("UTC") TypeError: 'module' object is not callable

如果是Airflow 2.x,需确保 pendulum 版本 ≤ 2.1.2。

如果是Airflow 3.x,则无需 pendulum(Airflow 3.x 已移除对 pendulum 的依赖)。

解决办法:

pip uninstall pendulum -y
pip install pendulum==2.1.2
2.6.2 dag文件解析依赖报错

如果解析失败, 一般是dag中有相对应特殊依赖,按照报错使用pip依次安装缺失的依赖即可。

二、导入DataWorks

Airflow导出工具将输出调度任务流的描述文件,JSON格式,其数据结构符合DataWorks Spec规范。

用户可参考如下文档完成DataWorks导入:自定义DataWorks Spec导入DataWorks