规范:LHM调度迁移标准包数据结构

背景

LHM调度迁移工具实现了多种调度引擎间的工作流迁移。为实现灵活的n2n迁移,我们需要将各个调度组件的内容归一化,因此构建了统一描述层。模板参考了DataWorks接口格式和MigrationX的规范,以实现对DataWorks的最大适配、以及对migrationX集成。与DataWorks迁移助手不同的是,迁移助手的描述层主要针对DataWorks,在转换其他调度很多时候显得有些复杂且不够灵活,所以我们做了更加宽泛的描述,以实现任意两个描述之间做转换。

本文档主要面向需要自定义开发Reader-Converter-Writer插件的开发者,提供数据结构参考。同时,也用于帮助用户理解Reader输出文件包、Converter输出文件包的结构,便于用户在输出文件包中自行更改工作流的属性。

定义

标准数据结构存在两种定义:

  • 定义1:Java类,com.aliyun.migration.api.WorkflowProjectPackage。

  • 定义2:文件包,LHM命令行工具 Reader插件的标准输出、Converter插件的标准输入输出、Writer插件的标准输入。

标准数据结构以Java类为准,文件包仅是LHM命令行工具存储标准数据结构的方式,二者可相互转换与映射。

common中提供了标准数据结构转换工具:com.aliyun.migration.workflow.migration.common.file.WorkflowFileOperator

package com.aliyun.migration.workflow.migration.common.file;

public class WorkflowFileOperator {
    public static String writeFile(WorkflowProjectPackage workflowProjectPackage, File targetPackageFile);
    public static WorkflowProjectPackage readFile(String filePath);
}

com.aliyun.migration.api.WorkflowProjectPackage

JAR包提供;模板工程目录下也提供了编译前的源码,其中包含注释信息解释各成员的含义。

标准文件包结构

标准文件包结构如下,其中./<PACKAGE_NAME>/data/project下为核心内容,必不可少;./统计报表下包含若干统计报表,允许缺失。

.
├── <PACKAGE_NAME>
│   └── data
│       └── project
│           ├── project.json                    // 工作空间信息
│           ├── datasource.json                 // 工作空间数据源信息
│           │── workflow                        // 工作流集合
│           │   ├── <Workflow1>                     // Workflow
│           │   │   ├── workflow.json                   // 工作流定义
│           │   │   ├── trigger.json                    // 定时器定义
│           │   │   ├── nodeSpec.json                   // 节点定义
│           │   │   ├── nodeRelationSpec.json           // 血缘关系定义 
│           │   │   ├── script                          // 节点脚本
│           │   │   │   ├── <ScriptFile>.sh
│           │   │   │   ├── <ScriptFile>.json
│           │   │   │   ├── <ScriptFile>.sql
│           │   │   │   └── <ScriptFile>.py
│           │   │   ├── files.json                      // 资源文件属性
│           │   │   ├── functions.json                  // 函数属性
│           │   │   └── resource                        // 资源文件本体
│           │   │       └── <FileResource>.*
│           │   ├── <Workflow2>
│           │   │   └── ...
│           │   └── ...
│           └── script                          // 节点脚本汇总(非必要)
│               ├── DATAX                           // 按节点类型分别汇总
│               │   └── <ScriptFile>.json               // 节点脚本(命名规范:<projectName>_<nodeName>_<nodeId>.*)
│               ├── PYTHON
│               │   └── <ScriptFile>.py
│               ├── SHELL
│               │   └── <ScriptFile>.sh
│               ├── SQL
│               │   ├── HIVE                            // SQL节点会按数据源类型进一步细化分类
│               │   │   └── <ScriptFile>.sql
│               │   ├── MYSQL
│               │   │   └── <ScriptFile>.sql
│               │   └── ...
│               └── ...
└── 统计报表
    ├── 总体统计.csv
    ├── workflow_statistic.csv
    ├── node_statistic.csv
    ├── file_resource_statistic.csv
    └── function_statistic.csv

文件与Class的对应关系

WorkflowProjectPackage中的各元素被分解存储于标准文件包中,对应关系如下表所示。

文件

Class

./<PACKAGE_NAME>/data/project/

WorkflowProjectPackage

.../project.json

WorkflowProjectPackage.WorkflowProject

.../datasource.json

WorkflowProjectPackage.List\<Datasource\>

.../workflow/

WorkflowProjectPackage.List\<Workflow\>

.../workflow/\<Workflow\>/workflow.json

...Workflow

.../workflow/\<Workflow\>/trigger.json

...Workflow.List\<WorkflowTrigger\>

.../workflow/\<Workflow\>/nodeSpec.json

...Workflow.List\<WorkflowNode\>

.../workflow/\<Workflow\>/nodeRelationSpec.json

...Workflow.List\<RelationPackage\>

.../workflow/\<Workflow\>/script/

(外置存储的NodeScript)

.../workflow/\<Workflow\>/files.json

...Workflow.WorkflowNode.List\<FileResource\>

.../workflow/\<Workflow\>/functions.json

...Workflow.WorkflowNode.List\<Function\>

.../workflow/\<Workflow\>/resource/

(资源文件本体)

节点脚本

节点脚本的存储方式较为特殊,我们提供了两种存储方式:

1、存储于nodeSpec.json中,直接写在节点的script字段中。例子:

[
    {
        "nodeId": "16373034716928",
        "nodeName": "SeaTunnelNode1",
        "ownerType": "NONE",
        "description": "",
        "nodeType": "SEATUNNEL",
        "script": "{\"localParams\":[ ],\"rawScript\":\"env {\\n  execution.parallelism = 2\\n  job.mode = \\\"BATCH\\\"\\n  checkpoint.interval = 10000\\n}\\n\\nsource {\\n  FakeSource {\\n    parallelism = 2\\n    result_table_name = \\\"fake\\\"\\n    row.num = 16\\n    schema = {\\n      fields {\\n        name = \\\"string\\\"\\n        age = \\\"int\\\"\\n      }\\n    }\\n  }\\n}\\n\\nsink {\\n  Console {\\n  }\\n}\",\"resourceList\":[ ],\"startupScript\":\"seatunnel.sh\",\"useCustom\":true,\"deployMode\":\"client\"}",
        "state": "NORMAL",
        "priority": 2,
        "paramMap": {}
    }
]

2、将脚本存储为文件,并填写路径到nodeSpec.json节点的scriptPath字段中。可放置的位置有两处,其一是存储于节点所在workflowscript目录中,scriptPath字段填写脚本文件在workflow下的相对路径;其二是存储于project目录下的script目录中(所有节点脚本集中存放),scriptPath字段填写脚本文件名(前面可以带路径,但路径无效)。例子:

[
    {
        "nodeId": "16373877434496",
        "nodeName": "ConditionToNode2",
        "ownerType": "NONE",
        "description": "Condition分支2",
        "nodeType": "SHELL",
        "scriptPath": "script/null_ConditionToNode2_16373877434496.sh",
        "files": [ ],
        "state": "NORMAL",
        "priority": 2,
        "paramMap": {}
    }
]

工具导出功能会自动将脚本文件保存,并分别放置在节点所属工作流的script目录以及项目目录下的script目录中。当这两个位置都有相同脚本文件时,项目目录下的script目录中的文件具有更高的优先级。

设计上,我们建议将脚本存储在workflow目录下以保持目录结构清晰。同时,为了满足用户批量替换脚本的需求,也支持将脚本集中存放在project/script目录中,便于统一管理和替换操作。例如,用户可以批量获取所有SQL文件并提交给公共云 SQL转换服务,完成转换后进行批量替换更新。