背景
LHM调度迁移工具实现了多种调度引擎间的工作流迁移。为实现灵活的n2n迁移,我们需要将各个调度组件的内容归一化,因此构建了统一描述层。模板参考了DataWorks接口格式和MigrationX的规范,以实现对DataWorks的最大适配、以及对migrationX集成。与DataWorks迁移助手不同的是,迁移助手的描述层主要针对DataWorks,在转换其他调度很多时候显得有些复杂且不够灵活,所以我们做了更加宽泛的描述,以实现任意两个描述之间做转换。
本文档主要面向需要自定义开发Reader-Converter-Writer插件的开发者,提供数据结构参考。同时,也用于帮助用户理解Reader输出文件包、Converter输出文件包的结构,便于用户在输出文件包中自行更改工作流的属性。
定义
标准数据结构存在两种定义:
定义1:Java类,com.aliyun.migration.api.WorkflowProjectPackage。
定义2:文件包,LHM命令行工具 Reader插件的标准输出、Converter插件的标准输入输出、Writer插件的标准输入。
标准数据结构以Java类为准,文件包仅是LHM命令行工具存储标准数据结构的方式,二者可相互转换与映射。
common中提供了标准数据结构转换工具:com.aliyun.migration.workflow.migration.common.file.WorkflowFileOperator
package com.aliyun.migration.workflow.migration.common.file;
public class WorkflowFileOperator {
public static String writeFile(WorkflowProjectPackage workflowProjectPackage, File targetPackageFile);
public static WorkflowProjectPackage readFile(String filePath);
}
com.aliyun.migration.api.WorkflowProjectPackage
由JAR包提供;模板工程目录下也提供了编译前的源码,其中包含注释信息解释各成员的含义。
标准文件包结构
标准文件包结构如下,其中./<PACKAGE_NAME>/data/project
下为核心内容,必不可少;./统计报表
下包含若干统计报表,允许缺失。
.
├── <PACKAGE_NAME>
│ └── data
│ └── project
│ ├── project.json // 工作空间信息
│ ├── datasource.json // 工作空间数据源信息
│ │── workflow // 工作流集合
│ │ ├── <Workflow1> // Workflow
│ │ │ ├── workflow.json // 工作流定义
│ │ │ ├── trigger.json // 定时器定义
│ │ │ ├── nodeSpec.json // 节点定义
│ │ │ ├── nodeRelationSpec.json // 血缘关系定义
│ │ │ ├── script // 节点脚本
│ │ │ │ ├── <ScriptFile>.sh
│ │ │ │ ├── <ScriptFile>.json
│ │ │ │ ├── <ScriptFile>.sql
│ │ │ │ └── <ScriptFile>.py
│ │ │ ├── files.json // 资源文件属性
│ │ │ ├── functions.json // 函数属性
│ │ │ └── resource // 资源文件本体
│ │ │ └── <FileResource>.*
│ │ ├── <Workflow2>
│ │ │ └── ...
│ │ └── ...
│ └── script // 节点脚本汇总(非必要)
│ ├── DATAX // 按节点类型分别汇总
│ │ └── <ScriptFile>.json // 节点脚本(命名规范:<projectName>_<nodeName>_<nodeId>.*)
│ ├── PYTHON
│ │ └── <ScriptFile>.py
│ ├── SHELL
│ │ └── <ScriptFile>.sh
│ ├── SQL
│ │ ├── HIVE // SQL节点会按数据源类型进一步细化分类
│ │ │ └── <ScriptFile>.sql
│ │ ├── MYSQL
│ │ │ └── <ScriptFile>.sql
│ │ └── ...
│ └── ...
└── 统计报表
├── 总体统计.csv
├── workflow_statistic.csv
├── node_statistic.csv
├── file_resource_statistic.csv
└── function_statistic.csv
文件与Class的对应关系
WorkflowProjectPackage中的各元素被分解存储于标准文件包中,对应关系如下表所示。
文件 | Class |
./<PACKAGE_NAME>/data/project/ | WorkflowProjectPackage |
.../project.json | WorkflowProjectPackage.WorkflowProject |
.../datasource.json | WorkflowProjectPackage.List\<Datasource\> |
.../workflow/ | WorkflowProjectPackage.List\<Workflow\> |
.../workflow/\<Workflow\>/workflow.json | ...Workflow |
.../workflow/\<Workflow\>/trigger.json | ...Workflow.List\<WorkflowTrigger\> |
.../workflow/\<Workflow\>/nodeSpec.json | ...Workflow.List\<WorkflowNode\> |
.../workflow/\<Workflow\>/nodeRelationSpec.json | ...Workflow.List\<RelationPackage\> |
.../workflow/\<Workflow\>/script/ | (外置存储的NodeScript) |
.../workflow/\<Workflow\>/files.json | ...Workflow.WorkflowNode.List\<FileResource\> |
.../workflow/\<Workflow\>/functions.json | ...Workflow.WorkflowNode.List\<Function\> |
.../workflow/\<Workflow\>/resource/ | (资源文件本体) |
节点脚本
节点脚本的存储方式较为特殊,我们提供了两种存储方式:
1、存储于nodeSpec.json中,直接写在节点的script字段中。例子:
[
{
"nodeId": "16373034716928",
"nodeName": "SeaTunnelNode1",
"ownerType": "NONE",
"description": "",
"nodeType": "SEATUNNEL",
"script": "{\"localParams\":[ ],\"rawScript\":\"env {\\n execution.parallelism = 2\\n job.mode = \\\"BATCH\\\"\\n checkpoint.interval = 10000\\n}\\n\\nsource {\\n FakeSource {\\n parallelism = 2\\n result_table_name = \\\"fake\\\"\\n row.num = 16\\n schema = {\\n fields {\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n }\\n }\\n }\\n}\\n\\nsink {\\n Console {\\n }\\n}\",\"resourceList\":[ ],\"startupScript\":\"seatunnel.sh\",\"useCustom\":true,\"deployMode\":\"client\"}",
"state": "NORMAL",
"priority": 2,
"paramMap": {}
}
]
2、将脚本存储为文件,并填写路径到nodeSpec.json节点的scriptPath字段中。可放置的位置有两处,其一是存储于节点所在workflow的script目录中,scriptPath字段填写脚本文件在workflow下的相对路径;其二是存储于project目录下的script目录中(所有节点脚本集中存放),scriptPath字段填写脚本文件名(前面可以带路径,但路径无效)。例子:
[
{
"nodeId": "16373877434496",
"nodeName": "ConditionToNode2",
"ownerType": "NONE",
"description": "Condition分支2",
"nodeType": "SHELL",
"scriptPath": "script/null_ConditionToNode2_16373877434496.sh",
"files": [ ],
"state": "NORMAL",
"priority": 2,
"paramMap": {}
}
]
工具导出功能会自动将脚本文件保存,并分别放置在节点所属工作流的script目录以及项目目录下的script目录中。当这两个位置都有相同脚本文件时,项目目录下的script目录中的文件具有更高的优先级。
设计上,我们建议将脚本存储在workflow目录下以保持目录结构清晰。同时,为了满足用户批量替换脚本的需求,也支持将脚本集中存放在project/script目录中,便于统一管理和替换操作。例如,用户可以批量获取所有SQL文件并提交给公共云 SQL转换服务,完成转换后进行批量替换更新。