本文介绍了如何开发Converter插件,完成调度引擎间任务属性的转换。
写在前面
Convertor的目的是将源端调度任务转换为目标端调度任务,转换在LHM标准数据结构上进行。
工程中提供了一个Convertor插件的样例custom-example/custom-example-converter,建议结合样例使用本文档。
开发原则
转换在LHM标准数据结构上进行,输入输出都是com.aliyun.migration.api.WorkflowProjectPackage。转换结果应符合目标端调度引擎特性,如节点类型应符合目标端节点类型枚举值、调度属性应符合目标端规范。
开发转换插件前,应对源端、目标端调度引擎特性及其差异进行详尽的了解,设计完备的转换方案。
Overview
LHM标准输入TaskContent: WorkflowProjectPackage
----> Converter + ConverterFactory 依次转换各元素,转换器由ConverterFactory获取
----> PackageConverter
----> WorkflowConverter
----> NodeConverter
----> ShellNodeConverter
----> SqlNodeConverter
----> DataxConverter
----> ...
----> TriggerConverter
----> ResourceConverter
----> UDFConverter
----> ParameterConverter
----> TargetWorkflowProjectPackage
--------> LHM标准输出能力将TargetWorkflowProjectPackage存储至文件/元数据
标准工程结构
main/java
.
└── com.aliyun.migration.bwm.plugin.<YourPluginName>.converter
│
├── register // 插件注册(必要)
│ └ CustomExampleConverterRegisterSupervisor.java // 插件注册
│
├── module // 插件模块(必要,Converter的核心方法)
│ └── CustomExampleConverter.java
│
├── convert // 原子转换器(推荐!)
│ ├── CustomExampleConvertFactory.java // 转换工厂类(推荐!)
│ └── node // 各类节点的转换方法
│ ├── CustomExampleDefaultNodeConverter.java
│ ├── DataExportNodeConverter.java
│ ├── DataImportNodeConverter.java
│ ├── DependNodeConverter.java
│ ├── IdeaScriptNodeConverter.java
│ └── SqlNodeConverter.java
│
└── domain // 对象(可选)
├── BusinessNode.java
├── CustomExampleNode.java
├── CustomExampleNodeType.java
├── CustomExampleSqoopConstant.java
├── CustomExampleSqoopScript.java
├── DataExportNode.java
├── DatasourceInfo.java
├── DependNode.java
└── IdeScriptNode.java
main/resources
.
└── META-INF
└── services // 插件注册(必要)
└── com.aliyun.migration.workflow.migration.common.spi.BwmPluginRegisterSupervisor
test/java
.
└── com.aliyun.migration.bwm.plugin.<YourPluginName>.converter.test
└── ExampleTest.java // 测试类(可以写在每个插件module下面,也可以写在custom-test中)
test/resources
.
└── data
└── converter
├── conf // 测试Converter配置文件
│ └── ConverterConf.json
└── data // 测试Converter数据文件
└── ReaderOutput.zip
开发流程参考
你有一个待迁移的调度引擎HelloEngine,在上一教程中,我们演示了如何开发Reader对源端进行解析并转换成LHM标准数据结构(生成了一个标准包),接下来我们来开发Convert插件在LHM标准数据结构上完成调度任务转换。
1. 起个名字!(流程基本和Reader一样)
首先,创建一个Module,给你的插件起个名字,比如hello-engine-converter。
然后,在src/main/java下建立com.aliyun.migration.bwm.plugin.hello2dw(你的插件名).converter。
参考标准目录结构,建好下层目录register(必要)、module(必要)、convert、domain。
在module下建立你的Converter核心方法,比如Hello2DwConverter.java。
参考本Example,在register下编写你的注册类,并把注册类的Reference写到resources/META-INF/services/com.aliyun.migration.workflow.migration.common.spi.BwmPluginRegisterSupervisor中,这是注册插件的关键。
至此,你就创建好了一个标准的Converter插件,接下来我们开始开发Converter。
2. Converter开发
打开module/Hello2DwConverter.java,我们强烈建议你继承com.aliyun.migration.workflow.migration.common.AbstractConverter。AbstractConverter约束了Converter的标准入参、提供标准框架、提供标准输出能力、以及提供统计能力,你只需要关注一个点:
转换:怎么从源端调度转换为目标端引擎。
框架中提供了两种实现: 一种是开发自定义转换convertSelf(),一种是调用二方包来实现转换convertUseMx()。对于插件开发者而言,开发convertSelf()即可,migrationX提供的转换能力已被LHM完全集成。
// 自定义转换实现
@Override
public WorkflowProjectPackage convertSelf() {
// ...
}
// 转化能力也可以通过调用二方包来实现,当前最好的开源调度转换工程是MigrationX
// convertSelf和convertUseMx两个方法受converter配置项"if.use.migrationx.before"控制,二选一执行,默认使用convertSelf
// 如果使用convertSelf,则convertUseMx方法可以忽略,不用开发
@Override
public WorkflowProjectPackage convertUseMx() {
// ...
}
2.1 Converter.convertSelf()
convertSelf()是核心转换方法,其中完成了WorkflowProject的转换和Workflow的转换。
WorkflowProject的转换在convertFactory.getProjectConvert(taskContext).convert()中实现。
Workflow的转换在convertSingleWorkflow()中实现。
// 转换方法的核心
@Override
public WorkflowProjectPackage convertSelf() {
// 获取工作流包 -> 可以直接复用,无需自己实现
WorkflowProjectPackage workflowProjectPackage = this.getWorkflowPackage();
if (Objects.isNull(workflowProjectPackage)) {
log.warn("not found flow package");
return null;
}
// 缓存
taskContext.setWorkflowProjectPackage(workflowProjectPackage);
// 建立目标工作流包
WorkflowProjectPackage targetWorkflowProjectPackage = new WorkflowProjectPackage();
// 转换WorkflowProject -> 【核心实现】:需要在convertFactory中实现WorkflowProject的转换
WorkflowProject tgtWorkflowProject = convertFactory.getProjectConvert(taskContext).convert(workflowProjectPackage.getWorkflowProject());
targetWorkflowProjectPackage.setWorkflowProject(tgtWorkflowProject);
// 逐个转换Workflow -> 【核心实现】:需要实现convertSingleWorkflow
List<Workflow> tgtWorkflows = new ArrayList<>();
Optional.ofNullable(workflowProjectPackage.getWorkflowList()).orElse(new ArrayList<>()).forEach(workflow -> {
// todo: 转换Workflow(含WorkflowNode、trigger、resource、udf等)
Workflow tgtWorkflow = convertSingleWorkflow(workflow);
tgtWorkflows.add(tgtWorkflow);
});
targetWorkflowProjectPackage.setWorkflowList(tgtWorkflows);
// 存储结果
taskContext.setWorkflowProjectPackage(targetWorkflowProjectPackage);
taskContext.setTargetWorkflowProjectPackage(targetWorkflowProjectPackage);
return targetWorkflowProjectPackage;
}
2.2 ConverterFactory 转换工厂类
ConverterFactory是转换工厂类,负责获取各个元素的转换器。我们建议用工厂类组织各个元素的转换器,并在convertSelf()、convertSingleWorkflow()中调用。
public class AbstractConvertFactory {
/**
* 获取WorkflowProjectPackage默认转换器
* @param taskContext
* @return
*/
public AbstractConvert<WorkflowProjectPackage> getDefaultConvert(TaskContext taskContext) {
return new DefaultConvert(taskContext);
}
/**
* 获取WorkflowProject转换器
* @param taskContext
* @return
*/
public AbstractConvert<WorkflowProject> getProjectConvert(TaskContext taskContext);
/**
* 获取Workflow转换器
* @param taskContext
* @return
*/
public AbstractConvert<Workflow> getWorkflowConvert(TaskContext taskContext);
/**
* 获取WorkflowNode转换器
* @param taskContext
* @return
*/
public AbstractConvert<List<WorkflowNode>> getNodeListConvert(String taskTypeStr, TaskContext taskContext, Workflow workflow);
public AbstractConvert<List<WorkflowNode>> getDefaultNodeConvert(TaskContext taskContext, Workflow workflow);
public AbstractConvert<List<WorkflowNode>> getCustomNodeConvert(TaskContext taskContext, WorkflowNode workflowNode, Workflow workflow);
/**
* 获取WorkflowFunctionResource转换器
* @param taskContext
* @return
*/
public AbstractConvert<WorkflowFunctionResource> getFunctionConvert(TaskContext taskContext);
/**
* 获取WorkflowFileResource转换器
* @param taskContext
* @return
*/
public AbstractConvert<WorkflowFileResource> getFileConvert(TaskContext taskContext);
}
最主要的是节点转换器的组织getNodeListConvert(),以下提供了一个例子,根据节点Type选择节点转换器。
// 调度转换主要的开发工作是编写各类节点的转换器,AbstractConvertFactory中提供了选择转换器的方法
// 在每个转换器中实现了一个List<WorkflowNode> convert(List<WorkflowNode> workflowNodes)方法
// 输入输出都是list是因为转换可能出现一转一、一转多(源端的一个节点需要用目标端多个节点实现,即一转多)、多转一、多转多的情况
// 大部分情况都是一转一,即输入输出的list中均只有一个节点
@Override
public AbstractConvert<List<WorkflowNode>> getNodeListConvert(
String taskType, TaskContext taskContext, Workflow workflow) {
// taskType:任务类型,是选择转换器的条件
// taskContext:转换信息,含转换配置项,在初始化转换器时传入,供转换中使用
// workflow:工作流,在初始化转换器时传入,供转换中使用(转换节点有时候需要用到workflow的信息,如果用不上也可以不传)
// 识别taskType,根据taskType选择转换器
CustomExampleNodeType type = CustomExampleNodeType.getNodeType(taskType);
if (type == null) {
log.warn("not supported thie tasktype {}, use default convert",taskType);
return new DefaultNodeConvert(taskContext,workflow);
}
switch (type) {
// 作为一个教程,我们准备了若干数据开发、数据集成、逻辑控制的case
case BusinessScript, CallActivity, ParallelGateway:
// 一个常规转换的case
return new CustomExampleDefaultNodeConverter(taskContext,workflow);
case Depend:
// 一个涉及到任务血缘(依赖)转换的case(dependent节点)
return new DependNodeConverter(taskContext,workflow);
case Sql:
// 一个涉及到SQL转换的case,支持将不同数据源类型的SQL节点转换成指定目标端SQL节点类型
return new SqlNodeConverter(taskContext,workflow);
case IDEScript:
// 一个非常简单的脚本case
return new IdeaScriptNodeConverter(taskContext,workflow);
case DataExport:
// 一个数据集成的case
return new DataExportNodeConverter(taskContext,workflow);
case DataImport:
// 一个数据集成的case
return new DataImportNodeConverter(taskContext,workflow);
default:
return new DefaultNodeConvert(taskContext,workflow);
}
}
2.3 自定义converter配置项的设计
由于转换规则具有多样性,设置一些可配置项是有利于转换能力的复用,也可以利用可配置项为转换提供一些额外的信息。convert.json文件设计的目的就是存储这些配置项,该配置文件在convert初始化时会被加载到taskContext.BwmConfigration中,并传递到各个原子转换器中供取用。
常见的可配置设计包括:1、节点的映射规则。比如连接了不同种类数据源的SQL节点,可以配置到不同的目标端SQL节点类型。比如符合一些特征的Shell节点可以转换为DI。将节点映射规则放入可配置项中,相比直接写到代码里更有利于使用。当然,对于简单的项目而言,直接写到代码中固定转换规则也是可以的。2、目标端独有的调度参数。由于源端和目标端的特性差异,目标端独有的调度参数也可以放入可配置项中,并在convert的过程中完成设置。
自定义配置信息可以放置到self.conf中,以下提供一个例子。配置项的内容及结构可以自行设计,在取用时相应取用。
{
"self": {
"conf": [{
"nodes": "all",
"rule": {
"settings": {
"workflow.converter.sql.submitAs": {
"HIVE": "ODPS_SQL",
"PRESTO": "ODPS_SQL",
"STARROCKS": "ODPS_SQL"
},
"workflow.converter.impalaShell.submitAs": "ODPS_SQL",
"workflow.converter.remoteShell.submitAs": "SSH",
"workflow.converter.shellNodeType": "DIDE_SHELL",
"workflow.converter.spark.submitAs": {
"JAVA": "ODPS_SPARK",
"PYTHON": "ODPS_SPARK",
"SQL": "ODPS_SQL"
},
"workflow.converter.target.engine.type": "MaxCompute",
"workflow.converter.target.unknownNodeTypeAs": "DIDE_SHELL",
"workflow.converter.mrNodeType": "ODPS_MR",
"workflow.converter.commandSqlAs": "DIDE_SHELL",
"workflow.converter.connection.mapping": {
"HiveSource": "hesheng_maxcompute_test4",
"PrestoSource": "hesheng_maxcompute_test4",
"SrSource": "hesheng_maxcompute_test4",
"SshSource": "sshecs1"
}
}
}
}]
},
"schedule_datasource": {
"name": "EmrWorkflowTest",
"type": "DolphinScheduler",
"version": "3.0.0",
"properties": {
"endpoint": "http://123.57.X.XX:12345",
"project": "demo",
"token": ""
},
"operaterType": "MANUAL"
},
"target_schedule_datasource": {
"name": "NewIdeaTest",
"type": "DataWorks",
"properties": {
"endpoint": "dataworks.cn-hangzhou.aliyuncs.com",
"project_id": "249298",
"project_name": "jiman_newidea_test",
"ak": "xxx",
"sk": "xxx"
},
"operaterType": "MANUAL"
}
}
2.4 Converter.convertSingleWorkflow()
convertSingleWorkflow()是核心转换方法,其中完成了Workflow的转换。
需要依次处理Workflow中的节点、节点血缘、函数、文件资源、参数、定时配置等,建议调用ConvertFactory获取各元素的转换器。以下提供了一个convertSingleWorkflow的例子。
@Override
public Workflow convertSingleWorkflow(Workflow workflow) {
log.info("Convert In process: {}", workflow.getWorkflowName());
// 你可以在convertFactory中定义WorkflowConvert,使用这种方式调用
Workflow tgtWorkflow = convertAll(
convertFactory.getWorkflowConvert(taskContext)
, workflow
, workflow.getWorkflowId()
, workflow.getWorkflowName()
, taskContext.getStatisticOp().getWorkflowTotalStatisticOp()
, taskContext.getStatisticOp().getWorkflowSuccessStatisticOp()
, taskContext.getStatisticOp().getWorkflowFailureStatisticOp());
// 也可以直接写在此方法中
if(tgtWorkflow != null){
// 节点血缘转换
List<RelationPackage> relationPackages;
if (tgtWorkflow.getNodeRelations() != null) {
relationPackages = Optional.ofNullable(
tgtWorkflow.getNodeRelations()).orElseGet(ArrayList::new);
} else {
relationPackages = Optional.ofNullable(
workflow.getNodeRelations()).orElseGet(ArrayList::new);
tgtWorkflow.setNodeRelations(relationPackages);
}
// 节点转换(节点转换可能对血缘产生影响,特别是将一个节点转换为多个节点、或多对多、或多对一对情况)
Optional.ofNullable(workflow.getWorkflowNodes()).orElse(new ArrayList<>()).forEach(workflowNode -> {
List<WorkflowNode> workflowNodes = tgtWorkflow.getWorkflowNodes();
List<WorkflowNode> tgtResWorkflowNodes = new ArrayList<>();
workflowNodes.stream().filter(x -> !x.getNodeId().equals(workflowNode.getNodeId())).forEach(x-> tgtResWorkflowNodes.add(x));
tgtWorkflow.setWorkflowNodes(tgtResWorkflowNodes);
List<RelationPackage> orgRelationPackages = new ArrayList<>();
workflow.getNodeRelations().stream().forEach(x -> orgRelationPackages.add(x));
List<WorkflowNode> tgtNodes = null;
tgtNodes = this.convertSingleNode(tgtWorkflow, workflowNode);
compactNodeMap(workflowNode, tgtNodes);
rebuildRelation(relationPackages, workflowNode, tgtNodes, orgRelationPackages);
Set<String> nodeNameAndIdSets = Optional.ofNullable(tgtWorkflow.getWorkflowNodes())
.orElse(new ArrayList<>())
.stream()
.map(x-> x.getNodeName() + x.getNodeId())
.filter(StringUtils::isNoneBlank)
.collect(Collectors.toSet());
Optional.ofNullable(tgtNodes).orElse(new ArrayList<>()).forEach(tgtNode -> {
if (Objects.nonNull(tgtNode.getNodeName()) && (!nodeNameAndIdSets.contains(tgtNode.getNodeName() + tgtNode.getNodeId()))) {
tgtWorkflow.getWorkflowNodes().add(tgtNode);
}
});
});
// 函数转换
List<WorkflowFunctionResource> tgtFunctions = new ArrayList<>();
Optional.ofNullable(tgtWorkflow.getFunctions()).orElse(new ArrayList<>()).forEach(workflowFunctionResource -> {
WorkflowFunctionResource tgtFunction = convertAll(
convertFactory.getFunctionConvert(taskContext)
, workflowFunctionResource
, workflowFunctionResource.getFunctionId()
, workflowFunctionResource.getName()
, taskContext.getStatisticOp().getWorkflowFunTotalStatisticOp()
, taskContext.getStatisticOp().getWorkflowFunSuccessStatisticOp()
,taskContext.getStatisticOp().getWorkflowFunFailureStatisticOp());
tgtFunctions.add(tgtFunction);
});
tgtWorkflow.setFunctions(tgtFunctions);
// 文件资源转换
List<WorkflowFileResource> tgtFiles = new ArrayList<>();
Optional.ofNullable(tgtWorkflow.getFiles()).orElse(new ArrayList<>()).forEach(workflowFileResource -> {
WorkflowFileResource tgtFile = convertAll(
convertFactory.getFileConvert(taskContext)
, workflowFileResource
, workflowFileResource.getFileId()
, workflowFileResource.getName()
, taskContext.getStatisticOp().getWorkflowResourceTotalStatisticOp()
, taskContext.getStatisticOp().getWorkflowResourceSuccessStatisticOp()
,taskContext.getStatisticOp().getWorkflowResourceFailureStatisticOp());
tgtFiles.add(tgtFile);
});
tgtWorkflow.setFiles(tgtFiles);
// 处理全局参数
Map<String, Object> customProperties = workflow.getCustomProperties();
convertCustomProperties(customProperties,tgtWorkflow);
// 处理定时配置
List<WorkflowTrigger> triggers = workflow.getTriggers();
convertTrigger(triggers,tgtWorkflow);
tgtWorkflow.setTriggers(triggers);
// 参数转换
convertParam(tgtWorkflow);
}
return tgtWorkflow;
}
3. Converter测试
开发者可以在custom-converter中编写测试类,也可以直接在custom-test中编写测试类,我们建议使用后者。custom-test/test/java/com.aliyun.migration.bwm.plugin.test.PluginTester中提供了测试模板。
@Test
public void converterTest() throws Exception {
CustomPluginTestTemplate customPluginTestTemplate = new CustomPluginTestTemplate();
customPluginTestTemplate.customConverterTest(
"data/converter",
"ConverterConfig.json",
"ReadOutput.zip",
"ConverterOutput.zip", new CustomConverter()
);
}
Converter结果的检验包括两部分,首先是否存在运行错误,其次调度信息是否有遗漏。前者在运行中排查,后者需要到Converter输出包中查看。建议开发者在各元素的Converter中增加log.info(),打印尽可能多的过程信息。此外,建议开发者使用try-catch捕获每个原子转换中出现的错误,避免应少量节点的转换失败中断转换流程。
Converter的close方法中自带报表能力,开发者可以在命令行中查看转换成功与失败的情况。
4. 生成Jar包
使用mvn命令打包
mvn clean -U package -DskipTests=true -T 4 --settings '/Users/xxx/IdeaProjects/workflow-migration-plugin-template/settings.xml'
Jar包生成于converter module下的target路径中。