教程1:Reader插件开发

本文介绍了如何开发Reader插件,从任意调度引擎导出任务流信息。

写在前面

Reader的目的是从调度数据源中获取调度任务信息,然后解析、转换到LHM标准数据结构。

工程中提供了一个Reader插件的样例custom-example/custom-example-reader,建议结合样例使用本文档。

开发原则

数据结构标准化,同时尽可能的保留调度迁移源端的信息,为Converter做准备。

Overview

LHM标准输入TaskContent:调度源端连接串/调度源端导出包 
----> OpenAPIClient/FileLoader ---- 获取调度任务原始信息 
----> OpenAPIHandler/FileHandler ---- 解析调度任务原始信息(源端Domain)
----> Converter ---- 转换为LHM标准数据结构(WorkflowProjectPackage)
--------> LHM标准输出能力将WorkflowProjectPackage存储至文件/元数据

标准工程结构

main/java

.
└── com.aliyun.migration.bwm.plugin.<YourPluginName>.reader
    │
    ├── register // 插件注册(必要)
    │   └── CustomExampleReaderRegisterSupervisor.java // 插件注册
    │
    ├── module // 插件模块(必要,Reader的核心方法)
    │   └── CustomExampleReader.java
    │
    ├── handler // 插件处理器(可选)
    │   ├── AbstractCustomExampleHandler.java
    │   ├── CustomExampleFileHandler.java
    │   └── ... more handler
    │
    ├── domain // 源端对象(可选)
    │   ├── ExampleCommandBean.java
    │   ├── ExamplePackage.java
    │   ├── ExampleSqoopBeanNovelFormat.java
    │   ├── ExampleSqoopBeanOldFormat.java
    │   └── ExampleTimer.java
    │
    ├── convert // 源端对象到LHM标准数据结构的转换器(可选)
    │   └── CustomConverter.java
    │
    └── tools // 工具类(可选,源端API Client、源端调度数据文件Loader可以定义于此)
        ├── CustomCommonUtils.java
        └── CustomLoader.java

main/resources

.
└── META-INF
    └── services // 插件注册(必要)
        └── com.aliyun.migration.workflow.migration.common.spi.BwmPluginRegisterSupervisor

test/java

.
└── com.aliyun.migration.bwm.plugin.<YourPluginName>.reader.test
    └── ExampleTest.java // 测试类(可以写在每个插件module下面,也可以写在custom-test中)

test/resources

.
└── data
    └── reader
        ├── conf // 测试Reader配置文件
        │   └── ReaderConf.json
        └── data // 测试Reader数据文件
            └── ApiOutputData.zip

开发流程参考

你有一个待迁移的调度引擎HelloEngine,需要对HelloEngine的调度任务进行解析,那么你可以按照如下步骤开发Reader插件。

1. 起个名字!

首先,创建一个Module,给你的插件起个名字,比如hello-engine-reader。

然后,在src/main/java下建立com.aliyun.migration.bwm.plugin.hello(你的插件名).reader。

参考标准目录结构,建好下层目录register(必要)、module(必要)、handler、domain、convert、tools。

module下建立你的Reader核心方法,比如HelloEngineReader.java。

参考本Example,在register下编写你的注册类,并把注册类的Reference写到resources/META-INF/services/com.aliyun.migration.workflow.migration.common.spi.BwmPluginRegisterSupervisor中,这是注册插件的关键。

至此,你就创建好了一个标准的Reader插件,接下来我们开始开发Reader。

2. Reader开发

打开module/HelloEngineReader.java,我们强烈建议你继承com.aliyun.migration.workflow.migration.common.AbstractReader。AbstractReader约束了Reader的标准入参、提供标准框架、提供标准输出能力,你只需要关注两个点:

  • 读取:怎么从源端调度引擎获取信息。

  • 解析:如何将源端调度信息转换至LHM标准数据结构。

2.1 从源端调度引擎获取信息

WorkflowProject、Workflow、WorkflowNode、RelationPackage、WorkflowFunctionResource、Datasource、WorkflowProject、WorkflowFunctionResource、ParameterMap等。

获取方式有多种,其中两种较常见且易用:

  • 调度引擎OpenApi;

  • 调度引擎提供的导出功能获取一个导出包文件;注意,LHM要求必须打成一个zip包,包结构可自定义;

  • 其他:方式1和方式2混用;连接调度引擎元数据库等方法。

不同的获取方式可以开发独立的Handler处理,通过外部配置项控制Reader选用,选用逻辑在Readerinit方法中。

@Override
public Reader init(TaskContext taskContext) {
    // 初始化,将探查任务的所有信息存储于taskContext中
    setTaskContext(taskContext);
    // 自动调整调度数据源数据包存储路径
    optimalizeReader();
    // 获取调度数据源信息
    Datasource scheduleDataSource = taskContext.getReaderDatasource();
    supplyDataSourceOperatorType(scheduleDataSource);
    // 根据配置项中OperaterType选择对应的处理方式
    if(scheduleDataSource.getOperaterType() == Datasource.OperaterType.AUTO) {
        // todo
        //  AUTO:编写一个通过OpenAPI获取调度任务信息的ReaderHandler
        //  如DolphinScheduler等可以通过OpenAPI获取调度任务信息的调度引擎推荐使用这种方式
        customExampleHandler = new CustomExampleOpenAPIHandler(taskContext);
    } else {
        // todo:
        //  MANUAL:编写一个通过解析文件获取调度任务信息的ReaderHandler
        //  如DataArts等可以通过解析文件获取调度任务信息的调度引擎推荐使用这种方式
        customExampleHandler = new CustomExampleFileHandler(taskContext);
    }
    return this;
}

我们建议开发一些tools来获取调度任务信息。利用OpenApi获取调度任务信息时可以开发一个tools/HelloEngineOpenApiClient;利用文件获取调度任务信息时可以开发一个tools/HelloEngineFileLoader。

Client调用API所需的参数、Loader获取文件的路径以及其他可配置项可以放置于Reader配置项(json)文件中,整个配置文件的内容会被存储于taskContext中,供开发者获取。以下是一个可配置项例子,properties、conf下的元素可自定义key-value:

{
  "schedule_datasource": {
    "name": "hello_engine_schedule_datasource",
    "type": "hello_engine",
    "properties": {
      "project": "test_project",
      "endpoint": "localhost:12345",
      "token": "7h21dhehaw7euhiahfo229uf8fhnz"
    },
    "operaterType": "MANUAL"
  },
  "conf": {
    
  }
}

Client/Loader的输出可以是任何格式,包括String、JSONObject、Document(XML)。我们建议开发者根据调度引擎特性建立Domain作为Client/Loader的输出数据结构,如HelloEngineWorkflow.class、HelloEngineResource.class等,这会使得代码更清晰易读,更有助于转换。

Loader开发需要注意的是,由于各平台压缩工具的特性不同,部分zip包经LHM解压可能意外解压出一层额外的目录。例如

.
└── ApiOutput.zip
    ├── udf
    ├── 工作流
    ├── 数据处理
    ├── 数据导出
    └── 数据集成

经过解压可能生成

.
└── ApiOutput
    ├── udf
    ├── 工作流
    ├── 数据处理
    ├── 数据导出
    └── 数据集成

也可能生成

.
└── ApiOutput
    └── ApiOutput
        ├── udf
        ├── 工作流
        ├── 数据处理
        ├── 数据导出
        └── 数据集成

开发者需要根据实际情况判断是否需要调整解压后的目录结构,一个推荐的方式是在Loader的开始时判断当前文件夹是否符合调度数据包的结构。以上面的case为例,一级目录中应存在一个“工作流”文件夹。

public File findPackageRootDir(File filePath) {
    if (!Arrays.stream(Objects.requireNonNull(filePath.listFiles())).map(File::getName).toList().contains("工作流")) {
        for (File x : Objects.requireNonNull(filePath.listFiles())) {
            if (x.isDirectory() && Arrays.stream(Objects.requireNonNull(x.listFiles())).map(File::getName).toList().contains("工作流")) {
                filePath = x;
            }
        }
    }
    return filePath;
}

2.2 解析调度任务信息

开发convert/HelloEngineConverter将源端调度任务信息转换为LHM标准数据结构,最后输出com.aliyun.migration.api.WorkflowProjectPackage。只做数据结构上的转换,不要修改任何信息。

2.3 Reader运行流程

Reader整体结构如下:

  • module/HelloEngineReader根据Reader配置项中operaterType选择对应的handler/HelloEngineHandler;

  • handler/HelloEngineHandler中调用tools/HelloEngineClient、tools/HelloEngineLoader获取调度任务信息,输出为HelloEngineDomain;

  • handler/HelloEngineHandler中调用convert/HelloEngineConverterHelloEngineDomain转换为LHM标准数据结构,输出为com.aliyun.migration.api.WorkflowProjectPackage。

WorkflowProjectPackageLHM标准数据结构,包含WorkflowProject、Workflow、WorkflowNode、RelationPackage、WorkflowFunctionResource、Datasource、WorkflowProject、WorkflowFunctionResource、ParameterMap等。WorkflowProjectPackageworkflow-migration-api提供,内部元素由teleport-common提供。

在本地测试时,Reader会将WorkflowProjectPackage存储为一个文件包,作为Reader的结果输出;在LHM Web端,Reader会将WorkflowProjectPackage分解、存储至元数据。

WorkflowProjectPackage存储为文件/元数据的过程由AbstractReader实现,无需开发者关注。

3. Reader测试

开发者可以在custom-reader中编写测试类,也可以直接在custom-test中编写测试类,我们建议使用后者。custom-test/test/java/com.aliyun.migration.bwm.plugin.test.PluginTester中提供了测试模板。

@Test
public void readerTest() throws Exception {
    CustomPluginTestTemplate customPluginTestTemplate = new CustomPluginTestTemplate();
    customPluginTestTemplate.customReaderTest(
            "data/reader",
            "ReaderConfig.json",
            "ApiOutput.zip",
            "ReadOutput.zip", new CustomReader()
    );
}

Reader结果的检验包括两部分,首先是否存在运行错误,其次调度信息是否有遗漏。前者在运行中排查,后者需要到Reader输出包中查看。建议开发者在Client/Loader、Convert中增加log.info(),打印尽可能多的过程信息。

此外,Reader自带报表能力,开发者可以在Reader输出包中查看各调度元素的统计报表。

4. 生成Jar

使用mvn命令打包

mvn clean -U package -DskipTests=true -T 4 --settings '/Users/xxx/IdeaProjects/workflow-migration-plugin-template/settings.xml'

Jar包生成于reader module下的target路径中。