教程2:Converter插件开发

本文介绍了如何开发Converter插件,完成调度引擎间任务属性的转换。

写在前面

Convertor的目的是将源端调度任务转换为目标端调度任务,转换在LHM标准数据结构上进行。

工程中提供了一个Convertor插件的样例custom-example/custom-example-converter,建议结合样例使用本文档。

开发原则

转换在LHM标准数据结构上进行,输入输出都是com.aliyun.migration.api.WorkflowProjectPackage。转换结果应符合目标端调度引擎特性,如节点类型应符合目标端节点类型枚举值、调度属性应符合目标端规范。

开发转换插件前,应对源端、目标端调度引擎特性及其差异进行详尽的了解,设计完备的转换方案。

Overview

LHM标准输入TaskContent: WorkflowProjectPackage
----> Converter + ConverterFactory 依次转换各元素,转换器由ConverterFactory获取
    ----> PackageConverter
        ----> WorkflowConverter
            ----> NodeConverter
                ----> ShellNodeConverter
                ----> SqlNodeConverter
                ----> DataxConverter
                ----> ...
            ----> TriggerConverter
            ----> ResourceConverter
            ----> UDFConverter
            ----> ParameterConverter
----> TargetWorkflowProjectPackage
--------> LHM标准输出能力将TargetWorkflowProjectPackage存储至文件/元数据

标准工程结构

main/java

.
└── com.aliyun.migration.bwm.plugin.<YourPluginName>.converter
    │
    ├── register // 插件注册(必要)
    │   └ CustomExampleConverterRegisterSupervisor.java // 插件注册
    │
    ├── module // 插件模块(必要,Converter的核心方法)
    │   └── CustomExampleConverter.java
    │
    ├── convert // 原子转换器(推荐!)
    │   ├── CustomExampleConvertFactory.java // 转换工厂类(推荐!)
    │   └── node // 各类节点的转换方法
    │       ├── CustomExampleDefaultNodeConverter.java
    │       ├── DataExportNodeConverter.java
    │       ├── DataImportNodeConverter.java
    │       ├── DependNodeConverter.java
    │       ├── IdeaScriptNodeConverter.java
    │       └── SqlNodeConverter.java
    │
    └── domain // 对象(可选)
        ├── BusinessNode.java
        ├── CustomExampleNode.java
        ├── CustomExampleNodeType.java
        ├── CustomExampleSqoopConstant.java
        ├── CustomExampleSqoopScript.java
        ├── DataExportNode.java
        ├── DatasourceInfo.java
        ├── DependNode.java
        └── IdeScriptNode.java

main/resources

.
└── META-INF
    └── services // 插件注册(必要)
        └── com.aliyun.migration.workflow.migration.common.spi.BwmPluginRegisterSupervisor

test/java

.
└── com.aliyun.migration.bwm.plugin.<YourPluginName>.converter.test
    └── ExampleTest.java // 测试类(可以写在每个插件module下面,也可以写在custom-test中)

test/resources

.
└── data
    └── converter
        ├── conf // 测试Converter配置文件
        │   └── ConverterConf.json
        └── data // 测试Converter数据文件
            └── ReaderOutput.zip

开发流程参考

你有一个待迁移的调度引擎HelloEngine,在上一教程中,我们演示了如何开发Reader对源端进行解析并转换成LHM标准数据结构(生成了一个标准包),接下来我们来开发Convert插件在LHM标准数据结构上完成调度任务转换。

1. 起个名字!(流程基本和Reader一样)

首先,创建一个Module,给你的插件起个名字,比如hello-engine-converter。

然后,在src/main/java下建立com.aliyun.migration.bwm.plugin.hello2dw(你的插件名).converter。

参考标准目录结构,建好下层目录register(必要)、module(必要)、convert、domain。

module下建立你的Converter核心方法,比如Hello2DwConverter.java。

参考本Example,在register下编写你的注册类,并把注册类的Reference写到resources/META-INF/services/com.aliyun.migration.workflow.migration.common.spi.BwmPluginRegisterSupervisor中,这是注册插件的关键。

至此,你就创建好了一个标准的Converter插件,接下来我们开始开发Converter。

2. Converter开发

打开module/Hello2DwConverter.java,我们强烈建议你继承com.aliyun.migration.workflow.migration.common.AbstractConverter。AbstractConverter约束了Converter的标准入参、提供标准框架、提供标准输出能力、以及提供统计能力,你只需要关注一个点:

  • 转换:怎么从源端调度转换为目标端引擎。

框架中提供了两种实现: 一种是开发自定义转换convertSelf(),一种是调用二方包来实现转换convertUseMx()。对于插件开发者而言,开发convertSelf()即可,migrationX提供的转换能力已被LHM完全集成。

MigrationX - Github

// 自定义转换实现
@Override
public WorkflowProjectPackage convertSelf() {
    // ...
}

// 转化能力也可以通过调用二方包来实现,当前最好的开源调度转换工程是MigrationX
// convertSelf和convertUseMx两个方法受converter配置项"if.use.migrationx.before"控制,二选一执行,默认使用convertSelf
// 如果使用convertSelf,则convertUseMx方法可以忽略,不用开发
@Override
public WorkflowProjectPackage convertUseMx() {
    // ...
}

2.1 Converter.convertSelf()

convertSelf()是核心转换方法,其中完成了WorkflowProject的转换和Workflow的转换。

WorkflowProject的转换在convertFactory.getProjectConvert(taskContext).convert()中实现。

Workflow的转换在convertSingleWorkflow()中实现。

// 转换方法的核心
@Override
public WorkflowProjectPackage convertSelf() {
    // 获取工作流包 -> 可以直接复用,无需自己实现
    WorkflowProjectPackage workflowProjectPackage = this.getWorkflowPackage();
    if (Objects.isNull(workflowProjectPackage)) {
        log.warn("not found flow package");
        return null;
    }

    // 缓存
    taskContext.setWorkflowProjectPackage(workflowProjectPackage);

    // 建立目标工作流包
    WorkflowProjectPackage targetWorkflowProjectPackage = new WorkflowProjectPackage();

    // 转换WorkflowProject -> 【核心实现】:需要在convertFactory中实现WorkflowProject的转换
    WorkflowProject tgtWorkflowProject = convertFactory.getProjectConvert(taskContext).convert(workflowProjectPackage.getWorkflowProject());
    targetWorkflowProjectPackage.setWorkflowProject(tgtWorkflowProject);

    // 逐个转换Workflow -> 【核心实现】:需要实现convertSingleWorkflow
    List<Workflow> tgtWorkflows = new ArrayList<>();
    Optional.ofNullable(workflowProjectPackage.getWorkflowList()).orElse(new ArrayList<>()).forEach(workflow -> {
        // todo: 转换Workflow(含WorkflowNode、trigger、resource、udf等)
        Workflow tgtWorkflow = convertSingleWorkflow(workflow);
        tgtWorkflows.add(tgtWorkflow);
    });
    targetWorkflowProjectPackage.setWorkflowList(tgtWorkflows);

    // 存储结果
    taskContext.setWorkflowProjectPackage(targetWorkflowProjectPackage);
    taskContext.setTargetWorkflowProjectPackage(targetWorkflowProjectPackage);

    return targetWorkflowProjectPackage;
}

2.2 ConverterFactory 转换工厂类

ConverterFactory是转换工厂类,负责获取各个元素的转换器。我们建议用工厂类组织各个元素的转换器,并在convertSelf()、convertSingleWorkflow()中调用。

public class AbstractConvertFactory {
    /**
     * 获取WorkflowProjectPackage默认转换器
     * @param taskContext
     * @return
     */
    public AbstractConvert<WorkflowProjectPackage> getDefaultConvert(TaskContext taskContext) {
        return new DefaultConvert(taskContext);
    }

    /**
     * 获取WorkflowProject转换器
     * @param taskContext
     * @return
     */
    public AbstractConvert<WorkflowProject> getProjectConvert(TaskContext taskContext);

    /**
     * 获取Workflow转换器
     * @param taskContext
     * @return
     */
    public AbstractConvert<Workflow> getWorkflowConvert(TaskContext taskContext);

    /**
     * 获取WorkflowNode转换器
     * @param taskContext
     * @return
     */
    public AbstractConvert<List<WorkflowNode>> getNodeListConvert(String taskTypeStr, TaskContext taskContext, Workflow workflow);
    public AbstractConvert<List<WorkflowNode>> getDefaultNodeConvert(TaskContext taskContext, Workflow workflow);
    public AbstractConvert<List<WorkflowNode>> getCustomNodeConvert(TaskContext taskContext, WorkflowNode workflowNode, Workflow workflow);
    
    /**
     * 获取WorkflowFunctionResource转换器
     * @param taskContext
     * @return
     */
    public AbstractConvert<WorkflowFunctionResource> getFunctionConvert(TaskContext taskContext);

    /**
     * 获取WorkflowFileResource转换器
     * @param taskContext
     * @return
     */
    public AbstractConvert<WorkflowFileResource> getFileConvert(TaskContext taskContext);
}

最主要的是节点转换器的组织getNodeListConvert(),以下提供了一个例子,根据节点Type选择节点转换器。

// 调度转换主要的开发工作是编写各类节点的转换器,AbstractConvertFactory中提供了选择转换器的方法
//  在每个转换器中实现了一个List<WorkflowNode> convert(List<WorkflowNode> workflowNodes)方法
//  输入输出都是list是因为转换可能出现一转一、一转多(源端的一个节点需要用目标端多个节点实现,即一转多)、多转一、多转多的情况
//  大部分情况都是一转一,即输入输出的list中均只有一个节点
@Override
public AbstractConvert<List<WorkflowNode>> getNodeListConvert(
        String taskType, TaskContext taskContext, Workflow workflow) {
    // taskType:任务类型,是选择转换器的条件
    // taskContext:转换信息,含转换配置项,在初始化转换器时传入,供转换中使用
    // workflow:工作流,在初始化转换器时传入,供转换中使用(转换节点有时候需要用到workflow的信息,如果用不上也可以不传)
    // 识别taskType,根据taskType选择转换器
    CustomExampleNodeType type = CustomExampleNodeType.getNodeType(taskType);
    if (type == null) {
        log.warn("not supported thie tasktype {}, use default convert",taskType);
        return new DefaultNodeConvert(taskContext,workflow);
    }
    switch (type) {
        // 作为一个教程,我们准备了若干数据开发、数据集成、逻辑控制的case
        case BusinessScript, CallActivity, ParallelGateway:
            // 一个常规转换的case
            return new CustomExampleDefaultNodeConverter(taskContext,workflow);
        case Depend:
            // 一个涉及到任务血缘(依赖)转换的case(dependent节点)
            return new DependNodeConverter(taskContext,workflow);
        case Sql:
            // 一个涉及到SQL转换的case,支持将不同数据源类型的SQL节点转换成指定目标端SQL节点类型
            return new SqlNodeConverter(taskContext,workflow);
        case IDEScript:
            // 一个非常简单的脚本case
            return new IdeaScriptNodeConverter(taskContext,workflow);
        case DataExport:
            // 一个数据集成的case
            return new DataExportNodeConverter(taskContext,workflow);
        case DataImport:
            // 一个数据集成的case
            return new DataImportNodeConverter(taskContext,workflow);
        default:
            return new DefaultNodeConvert(taskContext,workflow);
    }
}

2.3 自定义converter配置项的设计

由于转换规则具有多样性,设置一些可配置项是有利于转换能力的复用,也可以利用可配置项为转换提供一些额外的信息。convert.json文件设计的目的就是存储这些配置项,该配置文件在convert初始化时会被加载到taskContext.BwmConfigration中,并传递到各个原子转换器中供取用。

常见的可配置设计包括:1、节点的映射规则。比如连接了不同种类数据源的SQL节点,可以配置到不同的目标端SQL节点类型。比如符合一些特征的Shell节点可以转换为DI。将节点映射规则放入可配置项中,相比直接写到代码里更有利于使用。当然,对于简单的项目而言,直接写到代码中固定转换规则也是可以的。2、目标端独有的调度参数。由于源端和目标端的特性差异,目标端独有的调度参数也可以放入可配置项中,并在convert的过程中完成设置。

自定义配置信息可以放置到self.conf中,以下提供一个例子。配置项的内容及结构可以自行设计,在取用时相应取用。

{
  "self": {
    "conf": [{
      "nodes": "all",
      "rule": {
        "settings": {
          "workflow.converter.sql.submitAs": {
            "HIVE": "ODPS_SQL",
            "PRESTO": "ODPS_SQL",
            "STARROCKS": "ODPS_SQL"
          },
          "workflow.converter.impalaShell.submitAs": "ODPS_SQL",
          "workflow.converter.remoteShell.submitAs": "SSH",
          "workflow.converter.shellNodeType": "DIDE_SHELL",
          "workflow.converter.spark.submitAs": {
            "JAVA": "ODPS_SPARK",
            "PYTHON": "ODPS_SPARK",
            "SQL": "ODPS_SQL"
          },
          "workflow.converter.target.engine.type": "MaxCompute",
          "workflow.converter.target.unknownNodeTypeAs": "DIDE_SHELL",
          "workflow.converter.mrNodeType": "ODPS_MR",
          "workflow.converter.commandSqlAs": "DIDE_SHELL",
          "workflow.converter.connection.mapping": {
            "HiveSource": "hesheng_maxcompute_test4",
            "PrestoSource": "hesheng_maxcompute_test4",
            "SrSource": "hesheng_maxcompute_test4",
            "SshSource": "sshecs1"
          }
        }
      }
    }]
  },
  "schedule_datasource": {
    "name": "EmrWorkflowTest",
    "type": "DolphinScheduler",
    "version": "3.0.0",
    "properties": {
      "endpoint": "http://123.57.X.XX:12345",
      "project": "demo",
      "token": ""
    },
    "operaterType": "MANUAL"
  },
  "target_schedule_datasource": {
    "name": "NewIdeaTest",
    "type": "DataWorks",
    "properties": {
      "endpoint": "dataworks.cn-hangzhou.aliyuncs.com",
      "project_id": "249298",
      "project_name": "jiman_newidea_test",
      "ak": "xxx",
      "sk": "xxx"
    },
    "operaterType": "MANUAL"
  }
}

2.4 Converter.convertSingleWorkflow()

convertSingleWorkflow()是核心转换方法,其中完成了Workflow的转换。

需要依次处理Workflow中的节点、节点血缘、函数、文件资源、参数、定时配置等,建议调用ConvertFactory获取各元素的转换器。以下提供了一个convertSingleWorkflow的例子。

@Override
public Workflow convertSingleWorkflow(Workflow workflow) {
    log.info("Convert In process: {}", workflow.getWorkflowName());

    // 你可以在convertFactory中定义WorkflowConvert,使用这种方式调用
    Workflow tgtWorkflow = convertAll(
            convertFactory.getWorkflowConvert(taskContext)
            , workflow
            , workflow.getWorkflowId()
            , workflow.getWorkflowName()
            , taskContext.getStatisticOp().getWorkflowTotalStatisticOp()
            , taskContext.getStatisticOp().getWorkflowSuccessStatisticOp()
            , taskContext.getStatisticOp().getWorkflowFailureStatisticOp());

    // 也可以直接写在此方法中
    if(tgtWorkflow != null){
        // 节点血缘转换
        List<RelationPackage> relationPackages;
        if (tgtWorkflow.getNodeRelations() != null) {
            relationPackages = Optional.ofNullable(
                    tgtWorkflow.getNodeRelations()).orElseGet(ArrayList::new);
        } else {
            relationPackages = Optional.ofNullable(
                    workflow.getNodeRelations()).orElseGet(ArrayList::new);
            tgtWorkflow.setNodeRelations(relationPackages);
        }
        // 节点转换(节点转换可能对血缘产生影响,特别是将一个节点转换为多个节点、或多对多、或多对一对情况)
        Optional.ofNullable(workflow.getWorkflowNodes()).orElse(new ArrayList<>()).forEach(workflowNode -> {
            List<WorkflowNode> workflowNodes = tgtWorkflow.getWorkflowNodes();
            List<WorkflowNode> tgtResWorkflowNodes = new ArrayList<>();
            workflowNodes.stream().filter(x -> !x.getNodeId().equals(workflowNode.getNodeId())).forEach(x-> tgtResWorkflowNodes.add(x));
            tgtWorkflow.setWorkflowNodes(tgtResWorkflowNodes);
            List<RelationPackage> orgRelationPackages = new ArrayList<>();
            workflow.getNodeRelations().stream().forEach(x -> orgRelationPackages.add(x));
            List<WorkflowNode> tgtNodes = null;
            tgtNodes = this.convertSingleNode(tgtWorkflow, workflowNode);
            compactNodeMap(workflowNode, tgtNodes);
            rebuildRelation(relationPackages, workflowNode, tgtNodes, orgRelationPackages);
            Set<String> nodeNameAndIdSets = Optional.ofNullable(tgtWorkflow.getWorkflowNodes())
                    .orElse(new ArrayList<>())
                    .stream()
                    .map(x-> x.getNodeName() +  x.getNodeId())
                    .filter(StringUtils::isNoneBlank)
                    .collect(Collectors.toSet());
            Optional.ofNullable(tgtNodes).orElse(new ArrayList<>()).forEach(tgtNode -> {
                if (Objects.nonNull(tgtNode.getNodeName()) && (!nodeNameAndIdSets.contains(tgtNode.getNodeName() + tgtNode.getNodeId()))) {
                    tgtWorkflow.getWorkflowNodes().add(tgtNode);
                }
            });
        });
        // 函数转换
        List<WorkflowFunctionResource> tgtFunctions = new ArrayList<>();
        Optional.ofNullable(tgtWorkflow.getFunctions()).orElse(new ArrayList<>()).forEach(workflowFunctionResource -> {
            WorkflowFunctionResource tgtFunction = convertAll(
                    convertFactory.getFunctionConvert(taskContext)
                    , workflowFunctionResource
                    , workflowFunctionResource.getFunctionId()
                    , workflowFunctionResource.getName()
                    , taskContext.getStatisticOp().getWorkflowFunTotalStatisticOp()
                    , taskContext.getStatisticOp().getWorkflowFunSuccessStatisticOp()
                    ,taskContext.getStatisticOp().getWorkflowFunFailureStatisticOp());
            tgtFunctions.add(tgtFunction);
        });
        tgtWorkflow.setFunctions(tgtFunctions);
        // 文件资源转换
        List<WorkflowFileResource> tgtFiles = new ArrayList<>();
        Optional.ofNullable(tgtWorkflow.getFiles()).orElse(new ArrayList<>()).forEach(workflowFileResource -> {
            WorkflowFileResource tgtFile = convertAll(
                    convertFactory.getFileConvert(taskContext)
                    , workflowFileResource
                    , workflowFileResource.getFileId()
                    , workflowFileResource.getName()
                    , taskContext.getStatisticOp().getWorkflowResourceTotalStatisticOp()
                    , taskContext.getStatisticOp().getWorkflowResourceSuccessStatisticOp()
                    ,taskContext.getStatisticOp().getWorkflowResourceFailureStatisticOp());
            tgtFiles.add(tgtFile);
        });
        tgtWorkflow.setFiles(tgtFiles);
        // 处理全局参数
        Map<String, Object> customProperties = workflow.getCustomProperties();
        convertCustomProperties(customProperties,tgtWorkflow);
        // 处理定时配置
        List<WorkflowTrigger> triggers = workflow.getTriggers();
        convertTrigger(triggers,tgtWorkflow);
        tgtWorkflow.setTriggers(triggers);
        // 参数转换
        convertParam(tgtWorkflow);
    }
    return tgtWorkflow;
}

3. Converter测试

开发者可以在custom-converter中编写测试类,也可以直接在custom-test中编写测试类,我们建议使用后者。custom-test/test/java/com.aliyun.migration.bwm.plugin.test.PluginTester中提供了测试模板。

@Test
public void converterTest() throws Exception {
    CustomPluginTestTemplate customPluginTestTemplate = new CustomPluginTestTemplate();
    customPluginTestTemplate.customConverterTest(
            "data/converter",
            "ConverterConfig.json",
            "ReadOutput.zip",
            "ConverterOutput.zip", new CustomConverter()
    );
}

Converter结果的检验包括两部分,首先是否存在运行错误,其次调度信息是否有遗漏。前者在运行中排查,后者需要到Converter输出包中查看。建议开发者在各元素的Converter中增加log.info(),打印尽可能多的过程信息。此外,建议开发者使用try-catch捕获每个原子转换中出现的错误,避免应少量节点的转换失败中断转换流程。

Converterclose方法中自带报表能力,开发者可以在命令行中查看转换成功与失败的情况。

4. 生成Jar

使用mvn命令打包

mvn clean -U package -DskipTests=true -T 4 --settings '/Users/xxx/IdeaProjects/workflow-migration-plugin-template/settings.xml'

Jar包生成于converter module下的target路径中。