自定义DataWorks Spec导入DataWorks

本文介绍了如何基于LHM调度迁移工具将用户自行构造的DataWorks Spec导入DataWorks。

一、DataWorks Spec的构造与解析

LHM迁移工具允许用户按照DataWorks任务流标准结构(DataWorks Spec)自行构造描述文件,并通过LHM导入。作为导入的前置动作,LHM将对DataWorks Spec进行解析,并生成迁移标准包。

生成迁移标准包后,无需进行进一步转换,直接运行导入工具向DataWorks写入任务流。

1 DataWorks Spec定义

https://github.com/aliyun/dataworks-spec/tree/master

2 输入包构建

  1. 按照DataWorks Spec定义,自行构造任务流定义并保存为JSON文件。

  2. 将所有任务流JSON文件平铺于文件夹中,压缩文件夹为zip格式。

{
  "version": "1.1.0",
  "kind": "CycleWorkflow",
  "spec": {
    "nodes": [],
    "workflows": [
      {
        "id": "3451387436863448",
        "outputs": {
          "nodeOutputs": [
            {
              "artifactType": "NodeOutput",
              "data": "3451387436863448",
              "refTableName": "example_hive_operator"
            }
          ]
        },
        "nodes": [
          {
            "id": "2105761738722077",
            "name": "run_first",
            "instanceMode": "T+1",
            "rerunMode": "FailureAllowed",
            "rerunTimes": 0,
            "rerunInterval": 300000,
            "trigger": {
              "type": "Scheduler",
              "startTime": "2025-03-30 00:00:00",
              "timezone": "UTC"
            },
            "runtimeResource": {
              "resourceGroup": "Serverless_res_group_651147510078336_710919704558240"
            },
            "script": {
              "path": "run_first",
              "runtime": {
                "command": "EMR_HIVE"
              },
              "parameters": [],
              "content": "\n    create database if not exists airflow;\n    use airflow;\n    drop table if exists test_hive;\n    create table test_hive(name string);\n    insert into test_hive values('studio');\n    "
            },
            "outputs": {
              "nodeOutputs": [
                {
                  "data": "2105761738722077"
                },
                {
                  "artifactType": "NodeOutput",
                  "data": "example_hive_operator.run_first.run_second",
                  "refTableName": "run_first"
                }
              ]
            },
            "type": "EMR_HIVE"
          },
          {
            "id": "3326558787158921",
            "name": "run_second",
            "instanceMode": "T+1",
            "rerunMode": "FailureAllowed",
            "rerunTimes": 0,
            "rerunInterval": 300000,
            "trigger": {
              "type": "Scheduler",
              "startTime": "2025-03-30 00:00:00",
              "timezone": "UTC"
            },
            "runtimeResource": {
              "resourceGroup": "Serverless_res_group_651147510078336_710919704558240"
            },
            "script": {
              "path": "run_second",
              "runtime": {
                "command": "EMR_HIVE"
              },
              "parameters": [],
              "content": "\n    use airflow;\n    add jar oss://emr-studio-example/hive-udf-1.0-SNAPSHOT.jar;\n    create temporary function simpleudf AS 'com.aliyun.emr.hive.udf.SimpleUDFExample';\n    show functions like '*udf';\n    select simpleudf(name) from test_hive;\n    "
            },
            "outputs": {
              "nodeOutputs": [
                {
                  "data": "3326558787158921"
                }
              ]
            },
            "type": "EMR_HIVE"
          }
        ],
        "dependencies": [
          {
            "nodeId": "3326558787158921",
            "depends": [
              {
                "type": "Normal",
                "output": "2105761738722077",
                "refTableName": "run_first"
              }
            ]
          }
        ],
        "script": {
          "path": "Airflow导入_V3/example_hive_operator",
          "runtime": {
            "command": "WORKFLOW"
          },
          "parameters": []
        },
        "name": "example_hive_operator",
        "trigger": {
          "type": "Scheduler",
          "cron": "0 0 0 * * ?",
          "timezone": "UTC",
          "delaySeconds": 0
        },
        "type": "CycleWorkflow",
        "strategy": {
          "timeout": 0,
          "instanceMode": "T+1",
          "rerunMode": "Allowed",
          "rerunTimes": 0,
          "rerunInterval": 0,
          "failureStrategy": "Continue"
        }
      }
    ],
    "flow": [
      {
        "nodeId": "3451387436863448",
        "depends": []
      }
    ]
  }
}

压缩命令(参考)

zip -q -r -m -o <PackageName>.zip PackageName

3 配置项构建

请使用如下配置项,内容无需更改。调度信息由自定义DataWorks Spec描述,因此此处无需配置过多配置项。

{
  "schedule_datasource": {
    "name": "MySpec",
    "type": "DataWorks",
    "operaterType": "MANUAL"
  },
  "conf": {}
}

4 运行调度解析工具

解析工具通过命令行调用,调用命令如下:

sh ./bin/run.sh read \
-c ./conf/<你的配置文件>.json \
-f ./data/0_OriginalPackage/<输入包>.zip \
-o ./data/1_ReaderOutput/<迁移标准包>.zip \
-t dw-newidea-reader

其中-c为配置文件路径,-f为输入包路径,-o为迁移标准包的生成路径,-t为探查插件名称。

例如,当前需要解析项目A:

sh ./bin/run.sh read \
-c ./conf/projectA_read.json \
-f ./data/0_OriginalPackage/projectA_DataworksSpec.zip \
-o ./data/1_ReaderOutput/projectA_ReaderOutput.zip \
-t dw-newidea-reader

探查工具运行中将打印过程信息,请关注运行过程中是否有报错。

5 查看解析结果

打开./data/1_ReaderOutput/下的生成包ReaderOutput.zip,可预览解析结果。

其中,统计报表是对DataWorks Spec中任务流、节点、资源、函数、数据源基本信息的汇总展示。

data/project文件夹下是对DataWorks Spec调度信息数据结构标准化后的结果。

image.png

image.png

image.png

统计报表提供了两项特殊能力:

1、报表中工作流、节点的部分属性被允许更改,允许更改的字段以蓝色字体标识。在下一阶段导入DataWorks时,工具将获取表格中的属性变更并使其生效。

2、报表允许通过删除工作流子表中的行,使得在导入DataWorks时跳过这些工作流(工作流黑名单)。注意!若工作流存在相互依赖关系,相关联的工作流需要同批次导入,不可通过黑名单进行分割。分割会产生异常!

详见:使用调度迁移中的概览报表补充修改调度属性

二、导入DataWorks

导入工具支持多轮刷写,会自动选择创建/更新任务流(OverWrite模式)。

1 前置条件

1.1 DataWorks Spec解析成功

DataWorks Spec解析完成,ReaderOutput.zip被成功生成。

(可选,推荐)打开解析输出包,查看统计报表,核对待迁移范围是否被解析成功。

1.2 DataWorks侧配置

DataWorks侧需进行以下动作:

1、创建工作空间。

2、创建AK、SK且保证AK、SK对工作空间具有管理员权限。(强烈建议建立与账号有绑定关系的AK、SK,以便在写入遇到问题时进行排查)

3、在工作空间中建立数据源、绑定计算资源、创建资源组。

4、在工作空间中上传文件资源、创建UDF。

1.3 网络连通性检查

验证能否连接DataWorks Endpoint。

服务接入点列表:

服务接入点

ping dataworks.aliyuncs.com

2 导入配置项

在工程目录的conf文件夹下创建导出配置文件(JSON格式),如writer.json。

  • 使用前请删除json中的注释。

{
  "schedule_datasource": {
    "name": "YourDataWorks", //给你的DataWorks数据源起个名字!
    "type": "DataWorks",
    "properties": {
      "endpoint": "dataworks.cn-hangzhou.aliyuncs.com", // 服务接入点
      "project_id": "YourProjectId", // 工作空间ID
      "project_name": "YourProject", // 工作空间名称
      "ak": "************", // AK
      "sk": "************", // SK
    },
    "operaterType": "MANUAL"
  },
  "conf": {
    "di.resource.group.identifier": "Serverless_res_group_***_***", // 调度资源组
    "resource.group.identifier": "Serverless_res_group_***_***", // 数据集成资源组
    "dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls", // DataWorks节点类型表的路径
    "qps.limit": 5 // 向DataWorks发送API请求的QPS上限
  }
}

2.1 服务接入点

根据DataWorks所在Region选择服务接入点,参考文档:

服务接入点

2.2 工作空间ID与名称

打开DataWorks控制台,打开工作空间详情页,从右侧基本信息中获取工作空间ID与名称。

image.png

2.3 创建AK、SK并授权

在用户页创建AK、SK,要求对目标DataWorks工作空间拥有管理员读写权限。

image.png

权限管理包括两处,如果账号是RAM账号,则需先对RAM账号进行DataWorks操作授权。

权限策略页面:https://ram.console.aliyun.com/policies

image.png

image.png

然后在DataWorks工作空间中,将工作空间权限赋给账号。

image.png

注意!AccessKey可设置网络访问限制策略,请务必保证迁移工具所在机器的IP被允许访问。

image.png

2.4 资源组

DataWorks工作空间详情页左侧菜单栏进入资源组页面,绑定资源组,并获取资源组ID。

通用资源组可用于节点调度,也可用于数据集成。配置项中调度资源组resource.group.identifier和数据集成资源组di.resource.group.identifier可以配置为同一通用资源组。

image.png

2.5 QPS设置

工具通过调用DataWorksAPI进行导入操作。不同DataWorks版本中的读、写OpenAPI分别有相应的QPS限制和每日调用次数限制,详见链接:使用限制

DataWorks基础版、标准版、专业版建议填写"qps.limit": 5,企业版建议填写"qps.limit": 20。

注意,请尽可能避免多个导入工具同时运行。

2.6 DataWorks节点类型ID设置

DataWorks中,部分节点类型在不同Region中被分配了不同的TypeId。具体TypeIDDataWorks数据开发实际界面为准。存在此特性的节点类型以数据库节点为主:数据库节点

如:MySQL节点在杭州RegionNodeTypeId1000039、在深圳RegionNodeTypeId1000041。

为适应上述DataWorks不同Region的差异特性,工具提供了一种可配置的方式,允许用户配置工具所使用的节点TypeId表。

image

表格通过导入工具的配置项引入:

"conf": {
    "dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls" // DataWorks节点类型表的路径
 }

DataWorks数据开发界面上获取节点类型Id的方法:在界面上新建一个工作流,并在工作流中新建一个节点,在点击保存后查看工作流的Spec。

image

若节点类型配置错误,在任务流发布时将提示以下错误。

image

3 运行DataWorks导入工具

转换工具通过命令行调用,调用命令如下:

sh ./bin/run.sh write \
-c ./conf/<你的配置文件>.json \
-f ./data/1_ReaderOutput/<解析结果输出包>.zip \
-o ./data/4_WriterOutput/<导入结果存储包>.zip \
-t dw-newide-writer

其中-c为配置文件路径,-fReaderOutput包存储路径,-oWriterOutput包存储路径,-t为提交插件名称。

例如,当前需要导入DataWorks的项目A:

sh ./bin/run.sh write \
-c ./conf/projectA_write.json \
-f ./data/1_ReaderOutput/projectA_ReaderOutput.zip \
-o ./data/4_WriterOutput/projectA_WriterOutput.zip \
-t dw-newide-writer

导入工具运行中将打印过程信息,请关注运行过程中是否有报错。导入完成后将在命令行中打印导入成功与失败的统计信息。注意,部分节点的导入失败不会影响整体导入流程,如遇少量节点导入失败,可在DataWorks中进行手动修改。

4 查看导入结果

导入完成后,可在DataWorks中查看导入结果。导入过程中亦可查看工作流逐个导入的过程,如发现问题需要终止导入,可运行jps命令找到BwmClientApp,并使用kill -9终止导入。

5 Q&A

5.1 源端持续在进行开发,这些增量与变更如何提交到DataWorks?

迁移工具为OverWrite模式,重新运行导出、转换、导入可实现将源端增量提交到DataWorks的能力。请注意,工具将根据全路径匹配任务流以选择创建任务流/更新任务流。如需进行变更迁移,请勿移动任务流。

5.2 源端持续在进行开发,同时进行DataWorks上任务流改造与治理,增量迁移时是否会覆盖DataWorks上的变更?

是的,迁移工具为OverWrite模式,建议您在完成迁移后再在DataWorks上进行后续改造。或者采用分批迁移的方式,已迁移等任务流再确认不再刷写后开始DataWorks改造,不同批次之间互相不会影响。

5.3 整个包导入耗时太长,能否只导入一部分

可以,可手动裁剪待导入包来实现部分导入:将data/project/workflow文件夹下需要导入的任务流保留、其他任务流删除,重新压缩回压缩包,再运行导入工具。注意,存在相互依赖的任务流需要捆绑导入,否则任务流间的节点血缘将会丢失。