将任务编排迁移到DMS Airflow

更新时间:
复制为 MD 格式

DMS旧任务编排功能即将下线,您可以通过Data Agent将已有的任务编排迁移到DMS Airflow。本文介绍迁移的准备工作、费用说明和完整操作步骤。

准备工作

  • 如果您未购买Data Agent,需要提交工单申请加入Data Agent的白名单。加白后支持一个席位,迁移完成后该席位将释放。加白席位默认一个月后自动清理。如在该期间内未完成迁移,可以提交工单延长使用时间。

  • 在您要创建DMS Airflow实例的地域内,创建VPC、交换机(vSwitch)和OSS Bucket,其中OSS Bucket用于存储AirflowDAG代码和日志信息。

    说明

    Airflow实例与要迁移的调度任务可以位于不同地域内。例如,您可以将北京地域的调度任务迁移至杭州地域的Airflow实例。

费用说明

计费项

说明

Data Agent

Data Agent为付费产品,免费用户每天有30分钟的使用额度。如果额度不足,您可以提交工单申请免费使用,直到迁移完成。

DMS Airflow

Airflow实例的费用与该实例的工作流规格(使用的CU数)相关,单价请以实例所在工作空间的资源配置页面为准。

OSS Bucket

OSS Bucket存储DAG代码和日志数据会产生存储费用,详情参见计费概述

AnalyticDB(可选)

如果您的任务中包含离线数据集成或跨库Spark任务,需要使用AnalyticDB MySQL版来支持。AnalyticDB费用请参见AnalyticDB MySQL版计费项

步骤一:创建Airflow实例

(推荐)方式一:使用Data Agent自动创建

  1. 打开Data Agent页面,单击左侧导航栏中的开发切换到开发模式。

    说明

    免费用户每天可使用30分钟。如果额度不足,请参见准备工作中的说明申请额度。

  2. Data Agent发送以下消息:

    使用skills airflow-operation 创建一个Airflow实例,实例名称:xxx, 地域为:xxx vpc: xxx 交换机:xxx 安全组:xxx oss bucket:xxx, logs目录为airflow-logs, 实例规格选择SMALL
    说明

    请将消息中的xxx替换为您实际的VPC、交换机、安全组和OSS Bucket信息。

  3. 等待Airflow实例创建完成。

方式二:手动创建

推荐您使用Data Agent自动创建Airflow实例。如需手动创建Airflow实例,请按照顺序执行以下操作:

(可选)创建工作空间

说明

如果您已在DMS内创建了工作空间,可跳过此步骤,直接配置工作空间资源

  1. 登录数据管理DMS 5.0

  2. 单击控制台左上角的2023-01-28_15-57-17.png图标,在Data+AI区域单击工作空间

    说明

    如果之前没有进行过RAM授权,会自动弹出授权提醒,按提示操作授权即可。如未弹出提醒,表示已进行过RAM授权。

  3. 在显示的页面中,单击新建工作空间,配置以下参数,然后单击确认

    1. 空间名称输入工作空间名称。

    2. 空间说明(可选):为工作空间添加描述信息。

    3. 地域:选择与准备好的VPC,vSwitchOSS Bucket相同的地域。该地域即为Airflow实例的地域。

    4. 负责人:选择工作空间的管理员,您也可以修改其他成员的角色类型。

    5. VPC ID:选择已准备好的VPC。

配置工作空间资源

  1. 登录数据管理DMS 5.0

  2. 单击控制台左上角的2023-01-28_15-57-17.png图标,在Data+AI区域单击工作空间

    说明

    如果之前没有进行过RAM授权,会自动弹出授权提醒,按提示操作授权即可。如未弹出提醒,表示已进行过RAM授权。

  3. 在工作空间列表中,单击已有工作空间或新建完成的工作空间名称。

  4. 在工作空间页面左下角,选择image > 空间管理

  5. 在打开的页面中,单击资源配置页签。

  6. 工作空间资源配置区域单击资源配置

  7. 资源配置面板,指定CU配置。推荐将值设为200以上,以防创建Airflow时出现资源不足问题。

  8. 单击创建,在支付页面单击支付完成购买。配置资源时的支付金额为0,后续实际费用以使用的CU资源为准。

创建Airflow实例

  1. 在配置好的工作空间页面左侧菜单中,选择image > Airflow实例,然后单击创建实例

  2. 配置以下实例参数,未列出的参数保持默认值即可。

    参数

    说明

    实例名称

    指定Airflow实例的名称。

    工作流规格

    请根据工作流规模和复杂程度合理选择规格。对于一般工作流,可以选择默认规格。更多信息,请参见Airflow规格信息

    Worker节点扩展

    Airflow会根据任务负载情况自动调整使用节点数。对于一般工作流,可以选择默认规格。Worker节点最小为1,最大为10。

    VPC ID

    无需调整,默认与工作空间的VPC一致。

    交换机

    选择在准备工作中创建的交换机。

    安全组

    选择普通安全组。更多信息,请参考普通安全组与企业级安全组

    日志存储

    选择在准备工作阶段创建的OSS Bucket,并指定存储Airflow日志的文件夹。

    Dag目录

    保持默认目录即可。

  3. 单击创建。Airflow实例首次创建预计需要20分钟左右。

  4. 创建完成后,在Airflow实例页面找到创建的实例,单击操作列的打开即可打开实例页面。

    说明

    如果遇到浏览器无法打开页面的问题,可以点击image图标在新页面打开。

步骤二:转换任务流代码

使用Data Agent将任务编排的任务流转换为Airflow DAG代码。AirflowDAG代码支持PythonYAML格式,目前仅提供Python代码的支持。

说明

Data Agent会话6小时自动过期。如果已过期,单击左侧新任务创建新的会话继续操作。

发起转换任务

  1. 打开Data Agent页面,单击左侧导航栏中的开发切换到开发模式。

  2. Data Agent发送消息发起转换任务,以下为示例。您也可以根据实际需求调整消息内容,例如先指定几个任务流进行测试。

    使用 skills airflow-migration-from-dms 迁移所有任务流,地域为cn-hangzhou
    说明

    消息中的地域统一为cn-hangzhou。

  3. 等待Data Agent完成迁移。

检查转换结果

  1. 迁移完成后,Data Agent会生成迁移报告migration_report.md,请查阅报告确认迁移结果。

  2. Data Agent中输入查看迁移后注意事项迁移后我需要做哪些事情,Data Agent会根据您的实际情况进行分析和说明。

上传Airflow任务流

  1. Data Agent中发送以下消息,将DAG文件同步到OSS:

    使用skills airflow-operation 把dags目录上传并发布到工作空间{工作空间id/名字}里
    说明

    请将{工作空间id/名字}替换为您实际的工作空间ID或名称。

  2. 上传完成后,Airflow大约5分钟后会加载新的DAG。

步骤三:配置并运行Airflow任务流

迁移完成后,建议查看migration_report.md文件中的注意事项。您也可以在Data Agent中发送为了把dags运行起来,我还需要做什么查看待办事项。

配置Airflow变量

迁移完成后,部分离线数据集成和跨库Spark SQL依赖项(如AnalyticDB集群和数据库的账号密码)需要通过Airflow变量进行配置。

  1. 使用迁移生成的airflow_variables.json文件,在文件中填入需要配置的变量值。文件中需要指定以下参数:

    • spark_adb_cluster_idspark_adb_resource_group:AnalyticDB集群的ID和资源组。离线数据集成和跨库Spark SQL节点处理的数据库实例如果在同一个地域,可以使用同一个AnalyticDB集群。迁移脚本假设使用同一个AnalyticDB集群,如果不相同,需要自行调整。

    • spark_eni_vswitch_idspark_eni_security_group_id:数据库实例所在VPC里的vSwitch ID和安全组 ID。同一个AnalyticDB Spark任务中,数据库实例需要在同一个地域的同一个VPC内,否则需要通过VPC Peer等方式进行VPC打通才能使用。迁移脚本假设数据库实例在同一个VPC内,如果不相同,需要自行调整。

    • db_xxx_userdb_xxx_password:AnalyticDB Spark任务中使用的数据库的账号和密码。由于安全原因,ADB Spark任务无法使用DMS托管的账号密码,所以需要在Airflow里进行,迁移脚本会自动把需要配置的实例写入airflow_variables.json文件。

    airflow_variables.json内容示例:image

  2. Airflow页面的左侧菜单中,选择管理 > 变量,然后单击导入变量

    image

  3. 导入变量页面中,选择上传airflow_variables.json文件,然后在选择变量冲突解决方式区域选择覆盖模式,单击导入

    image

说明

导入后,password类型的变量会自动加密保存。

(可选)配置ECS连接

如果您的任务流中包含ECS相关节点,需要配置SSH连接。

  1. ECSSSH Private Key上传到OSS的挂载目录。如果您的ECS未配置SSH访问,需要先完成配置。

  2. Airflow页面的左侧菜单中,选择管理 > 连接,然后单击添加连接

    image

  3. 添加连接页面中配置以下参数,然后单击保存。

    • 标准字段区域,配置主机地址,即ECS实例的IP地址。如果ECSAirflow在同一VPC中,可使用VPC内网IP。否则需要使用公网地址或先打通VPC网络。

      image

    • 额外字段JSON区域,指定key_file的目录,即OSS挂载目录在Airflow中的路径。

      image

运行Airflow任务流

Airflow页面的左侧菜单中选择Dags,选择已配置完成的任务流,打开任务流调度开关image启用定时调度,或单击运行image按钮手动运行测试。

image

其他说明

  • 查询支持的节点类型:在Data Agent中发送skills airflow-migration-from-dms 都支持哪些DMS任务编排节点转换为Airflow查看支持列表。

  • 处理未覆盖的场景:由于部分任务类型不再支持或方案差异较大,转换工具未覆盖所有场景。您可以使用Data Agent修复遇到的问题,也可以反馈给DMS团队进行改进。

  • 实例登录过期:如果任务运行时出现"sql执行失败,原因:实例登录状态过期"的错误,说明您的数据库实例账号密码未托管在DMS上。请重新编辑实例,配置账号密码托管。

  • 变量功能转换:迁移过程中,针对任务编排的变量功能可能会出现问题,请根据Agent的提示信息进行调整。

  • 任务流状态要求:仅支持发布态的任务流进行迁移。

  • 中文支持:Airflow在迁移过程中会自动将中文任务名翻译为英文,您也可以根据需要调整翻译逻辑。