本文介绍了如何基于LHM调度迁移工具将用户自行构造的DataWorks Spec导入DataWorks。
一、DataWorks Spec的构造与解析
LHM迁移工具允许用户按照DataWorks任务流标准结构(DataWorks Spec)自行构造描述文件,并通过LHM导入。作为导入的前置动作,LHM将对DataWorks Spec进行解析,并生成迁移标准包。
生成迁移标准包后,无需进行进一步转换,直接运行导入工具向DataWorks写入任务流。
1 DataWorks Spec定义
https://github.com/aliyun/dataworks-spec/tree/master
2 输入包构建
按照DataWorks Spec定义,自行构造任务流定义并保存为JSON文件。
将所有任务流JSON文件平铺于文件夹中,压缩文件夹为zip格式。
{
"version": "1.1.0",
"kind": "CycleWorkflow",
"spec": {
"nodes": [],
"workflows": [
{
"id": "3451387436863448",
"outputs": {
"nodeOutputs": [
{
"artifactType": "NodeOutput",
"data": "3451387436863448",
"refTableName": "example_hive_operator"
}
]
},
"nodes": [
{
"id": "2105761738722077",
"name": "run_first",
"instanceMode": "T+1",
"rerunMode": "FailureAllowed",
"rerunTimes": 0,
"rerunInterval": 300000,
"trigger": {
"type": "Scheduler",
"startTime": "2025-03-30 00:00:00",
"timezone": "UTC"
},
"runtimeResource": {
"resourceGroup": "Serverless_res_group_651147510078336_710919704558240"
},
"script": {
"path": "run_first",
"runtime": {
"command": "EMR_HIVE"
},
"parameters": [],
"content": "\n create database if not exists airflow;\n use airflow;\n drop table if exists test_hive;\n create table test_hive(name string);\n insert into test_hive values('studio');\n "
},
"outputs": {
"nodeOutputs": [
{
"data": "2105761738722077"
},
{
"artifactType": "NodeOutput",
"data": "example_hive_operator.run_first.run_second",
"refTableName": "run_first"
}
]
},
"type": "EMR_HIVE"
},
{
"id": "3326558787158921",
"name": "run_second",
"instanceMode": "T+1",
"rerunMode": "FailureAllowed",
"rerunTimes": 0,
"rerunInterval": 300000,
"trigger": {
"type": "Scheduler",
"startTime": "2025-03-30 00:00:00",
"timezone": "UTC"
},
"runtimeResource": {
"resourceGroup": "Serverless_res_group_651147510078336_710919704558240"
},
"script": {
"path": "run_second",
"runtime": {
"command": "EMR_HIVE"
},
"parameters": [],
"content": "\n use airflow;\n add jar oss://emr-studio-example/hive-udf-1.0-SNAPSHOT.jar;\n create temporary function simpleudf AS 'com.aliyun.emr.hive.udf.SimpleUDFExample';\n show functions like '*udf';\n select simpleudf(name) from test_hive;\n "
},
"outputs": {
"nodeOutputs": [
{
"data": "3326558787158921"
}
]
},
"type": "EMR_HIVE"
}
],
"dependencies": [
{
"nodeId": "3326558787158921",
"depends": [
{
"type": "Normal",
"output": "2105761738722077",
"refTableName": "run_first"
}
]
}
],
"script": {
"path": "Airflow导入_V3/example_hive_operator",
"runtime": {
"command": "WORKFLOW"
},
"parameters": []
},
"name": "example_hive_operator",
"trigger": {
"type": "Scheduler",
"cron": "0 0 0 * * ?",
"timezone": "UTC",
"delaySeconds": 0
},
"type": "CycleWorkflow",
"strategy": {
"timeout": 0,
"instanceMode": "T+1",
"rerunMode": "Allowed",
"rerunTimes": 0,
"rerunInterval": 0,
"failureStrategy": "Continue"
}
}
],
"flow": [
{
"nodeId": "3451387436863448",
"depends": []
}
]
}
}
压缩命令(参考)
zip -q -r -m -o <PackageName>.zip PackageName
3 配置项构建
请使用如下配置项,内容无需更改。调度信息由自定义DataWorks Spec描述,因此此处无需配置过多配置项。
{
"schedule_datasource": {
"name": "MySpec",
"type": "DataWorks",
"operaterType": "MANUAL"
},
"conf": {}
}
4 运行调度解析工具
解析工具通过命令行调用,调用命令如下:
sh ./bin/run.sh read \
-c ./conf/<你的配置文件>.json \
-f ./data/0_OriginalPackage/<输入包>.zip \
-o ./data/1_ReaderOutput/<迁移标准包>.zip \
-t dw-newidea-reader
其中-c为配置文件路径,-f为输入包路径,-o为迁移标准包的生成路径,-t为探查插件名称。
例如,当前需要解析项目A:
sh ./bin/run.sh read \
-c ./conf/projectA_read.json \
-f ./data/0_OriginalPackage/projectA_DataworksSpec.zip \
-o ./data/1_ReaderOutput/projectA_ReaderOutput.zip \
-t dw-newidea-reader
探查工具运行中将打印过程信息,请关注运行过程中是否有报错。
5 查看解析结果
打开./data/1_ReaderOutput/下的生成包ReaderOutput.zip,可预览解析结果。
其中,统计报表是对DataWorks Spec中任务流、节点、资源、函数、数据源基本信息的汇总展示。
而data/project文件夹下是对DataWorks Spec调度信息数据结构标准化后的结果。
统计报表提供了两项特殊能力:
1、报表中工作流、节点的部分属性被允许更改,允许更改的字段以蓝色字体标识。在下一阶段导入DataWorks时,工具将获取表格中的属性变更并使其生效。
2、报表允许通过删除工作流子表中的行,使得在导入DataWorks时跳过这些工作流(工作流黑名单)。注意!若工作流存在相互依赖关系,相关联的工作流需要同批次导入,不可通过黑名单进行分割。分割会产生异常!
二、导入DataWorks
导入工具支持多轮刷写,会自动选择创建/更新任务流(OverWrite模式)。
1 前置条件
1.1 DataWorks Spec解析成功
DataWorks Spec解析完成,ReaderOutput.zip被成功生成。
(可选,推荐)打开解析输出包,查看统计报表,核对待迁移范围是否被解析成功。
1.2 DataWorks侧配置
DataWorks侧需进行以下动作:
1、创建工作空间。
2、创建AK、SK且保证AK、SK对工作空间具有管理员权限。(强烈建议建立与账号有绑定关系的AK、SK,以便在写入遇到问题时进行排查)
3、在工作空间中建立数据源、绑定计算资源、创建资源组。
4、在工作空间中上传文件资源、创建UDF。
1.3 网络连通性检查
验证能否连接DataWorks Endpoint。
服务接入点列表:
ping dataworks.aliyuncs.com
2 导入配置项
在工程目录的conf文件夹下创建导出配置文件(JSON格式),如writer.json。
使用前请删除json中的注释。
{
"schedule_datasource": {
"name": "YourDataWorks", //给你的DataWorks数据源起个名字!
"type": "DataWorks",
"properties": {
"endpoint": "dataworks.cn-hangzhou.aliyuncs.com", // 服务接入点
"project_id": "YourProjectId", // 工作空间ID
"project_name": "YourProject", // 工作空间名称
"ak": "************", // AK
"sk": "************", // SK
},
"operaterType": "MANUAL"
},
"conf": {
"di.resource.group.identifier": "Serverless_res_group_***_***", // 调度资源组
"resource.group.identifier": "Serverless_res_group_***_***", // 数据集成资源组
"dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls", // DataWorks节点类型表的路径
"qps.limit": 5 // 向DataWorks发送API请求的QPS上限
}
}
2.1 服务接入点
根据DataWorks所在Region选择服务接入点,参考文档:
2.2 工作空间ID与名称
打开DataWorks控制台,打开工作空间详情页,从右侧基本信息中获取工作空间ID与名称。
2.3 创建AK、SK并授权
在用户页创建AK、SK,要求对目标DataWorks工作空间拥有管理员读写权限。
权限管理包括两处,如果账号是RAM账号,则需先对RAM账号进行DataWorks操作授权。
权限策略页面:https://ram.console.aliyun.com/policies
然后在DataWorks工作空间中,将工作空间权限赋给账号。
注意!AccessKey可设置网络访问限制策略,请务必保证迁移工具所在机器的IP被允许访问。
2.4 资源组
由DataWorks工作空间详情页左侧菜单栏进入资源组页面,绑定资源组,并获取资源组ID。
通用资源组可用于节点调度,也可用于数据集成。配置项中调度资源组resource.group.identifier和数据集成资源组di.resource.group.identifier可以配置为同一通用资源组。
2.5 QPS设置
工具通过调用DataWorks的API进行导入操作。不同DataWorks版本中的读、写OpenAPI分别有相应的QPS限制和每日调用次数限制,详见链接:使用限制。
DataWorks基础版、标准版、专业版建议填写"qps.limit": 5,企业版建议填写"qps.limit": 20。
注意,请尽可能避免多个导入工具同时运行。
2.6 DataWorks节点类型ID设置
在DataWorks中,部分节点类型在不同Region中被分配了不同的TypeId。具体TypeID以DataWorks数据开发实际界面为准。存在此特性的节点类型以数据库节点为主:数据库节点。
如:MySQL节点在杭州Region的NodeTypeId为1000039、在深圳Region的NodeTypeId为1000041。
为适应上述DataWorks不同Region的差异特性,工具提供了一种可配置的方式,允许用户配置工具所使用的节点TypeId表。
表格通过导入工具的配置项引入:
"conf": {
"dataworks.node.type.xls": "/Software/bwm-client/conf/CodeProgramType.xls" // DataWorks节点类型表的路径
}
从DataWorks数据开发界面上获取节点类型Id的方法:在界面上新建一个工作流,并在工作流中新建一个节点,在点击保存后查看工作流的Spec。
若节点类型配置错误,在任务流发布时将提示以下错误。
3 运行DataWorks导入工具
转换工具通过命令行调用,调用命令如下:
sh ./bin/run.sh write \
-c ./conf/<你的配置文件>.json \
-f ./data/1_ReaderOutput/<解析结果输出包>.zip \
-o ./data/4_WriterOutput/<导入结果存储包>.zip \
-t dw-newide-writer
其中-c为配置文件路径,-f为ReaderOutput包存储路径,-o为WriterOutput包存储路径,-t为提交插件名称。
例如,当前需要导入DataWorks的项目A:
sh ./bin/run.sh write \
-c ./conf/projectA_write.json \
-f ./data/1_ReaderOutput/projectA_ReaderOutput.zip \
-o ./data/4_WriterOutput/projectA_WriterOutput.zip \
-t dw-newide-writer
导入工具运行中将打印过程信息,请关注运行过程中是否有报错。导入完成后将在命令行中打印导入成功与失败的统计信息。注意,部分节点的导入失败不会影响整体导入流程,如遇少量节点导入失败,可在DataWorks中进行手动修改。
4 查看导入结果
导入完成后,可在DataWorks中查看导入结果。导入过程中亦可查看工作流逐个导入的过程,如发现问题需要终止导入,可运行jps命令找到BwmClientApp,并使用kill -9终止导入。
5 Q&A
5.1 源端持续在进行开发,这些增量与变更如何提交到DataWorks?
迁移工具为OverWrite模式,重新运行导出、转换、导入可实现将源端增量提交到DataWorks的能力。请注意,工具将根据全路径匹配任务流以选择创建任务流/更新任务流。如需进行变更迁移,请勿移动任务流。
5.2 源端持续在进行开发,同时进行DataWorks上任务流改造与治理,增量迁移时是否会覆盖DataWorks上的变更?
是的,迁移工具为OverWrite模式,建议您在完成迁移后再在DataWorks上进行后续改造。或者采用分批迁移的方式,已迁移等任务流再确认不再刷写后开始DataWorks改造,不同批次之间互相不会影响。
5.3 整个包导入耗时太长,能否只导入一部分
可以,可手动裁剪待导入包来实现部分导入:将data/project/workflow文件夹下需要导入的任务流保留、其他任务流删除,重新压缩回压缩包,再运行导入工具。注意,存在相互依赖的任务流需要捆绑导入,否则任务流间的节点血缘将会丢失。