DataWorks自定义节点中运行任务时,需要调用自定义插件,因此在使用自定义节点前您需要创建好自定义插件包,并上传发布至DataWorks,便于使用自定义节点运行任务时使用。本文为您介绍如何创建自定义插件包。
背景信息
DataWorks的自定义节点运行时,插件开发使用过程中涉及以下2个接口。
submitJob(String codeFilePath, List args)
:提交插件任务的接口。包含以下入参。- codeFilePath:为页面开发的代码存储绝对路径。
- List args:为页面配置的调度参数,格式为
{"key"="value","key2"="value2"...}
。
返回参数如下。- 0:表示任务运行成功。
- 2:表示告知调度系统重新运行任务。
- 1、4、或3:表示任务被终止。
- 其他数值:表示任务运行失败。
killJob()
:此接口主要用于监听任务终止(kill)信号,然后会触发该接口调用。此接口没有入参和返回参数。说明 若业务层面在9秒内未处理完毕,整个插件进程会被强制终止,整个任务运行失败。
下载依赖JAR
点击以下链接下载依赖JAR包:alisa-wrapper-face-1.0.0.jar。
打包代码工程包
- 新建插件代码工程。用IDE工具新建一个Maven工程,文件结构和pom.xml文件配置要求如下所示。
- 文件结构要求文件结构要求参考下图,其中lib文件夹下alisa-wrapper-face-1.0.0.jar为上述下载的依赖JAR包,需要用本地依赖引入的方式引入代码工程中。
- pom.xml文件配置要求pom.xml文件示例如下,其中插件的类名(
<mainclass>
)为submitJob方法所在的类路径,如com.alibaba.dw.wrapper
。<groupId>org.example</groupId> <artifactId>dw-plugin</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.alibaba.dw</groupId> <artifactId>alisa-wrapper-face</artifactId> <version>1.0.0-SNAPSHOT</version> <scope>system</scope> <systemPath>${basedir}/lib/alisa-wrapper-face-1.0.0.jar</systemPath> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.8.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.5</version> <configuration> <archive> <manifest> <mainClass>com.alibaba.dw.wrapper.DemoWrapper</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
- 文件结构要求
- 代码开发。代码示例:基本逻辑
package com.alibaba.dw.alisa.wrapper; import java.io.File; import java.util.List; import com.alibaba.dw.alisawrapper.DwalisaWrapper; import com.alibaba.dw.alisawrapper.constants.Constant; /** **DwalisaWrapper 在alisa-wrapper-face包 **/ public class DemoWrapper extends DwalisaWrapper { /** * codeFilePath: 代码存储的全路径。 * args:传入的参数。注意其中args[0]是作为codeFilePath,如果任务无代码,则args[0]即为第一个参数。 * 该方法实现该插件的主要功能内容,业务逻辑都在此方法内部实现。 * 返回码0:表示任务成功。 * 返回码1、4、3:表示任务被终止。 * 返回码为其他数值:表示任务失败。 */ @SuppressWarnings("deprecation") @Override public Integer submitJob(String codeFilePath, List<String> args) { try { System.err.println("your code->"); //此处实现业务代码,此方法执行完毕后任务执行结束。 } catch (Exception e) { System.err.println(e); } System.out.println("task finished..."); return Constant.SUCCESSED_EXIT_CODE; } /** * 为终止任务方法,一旦发起终止任务时,会调用方法作为一些业务上的处理之后再退出。 * 注意:一旦9秒未退出,则会直接返回kill-9,终止该任务进程。 * */ @Override public void killJob() { System.err.println("收到了终止信号,需要做一些业务操作把任务从服务端终止"); } }
- 设置数据查询结果集被页面展示。如果您希望后续使用自定义节点时,进行数据查询等操作时,结果能在DataWorks的页面中直接展示,您需参考本步骤设置数据查询结果集被页面展示。从submitJob接口业务运行代码后,需将获取到的结果集(比如查询表记录)按照以下规则存储到指定目录中:
- 结果集文件获取方式
其中String.format("%s/%s.data", System.getenv("TASK_EXEC_PATH"), System.getenv("ALISA_TASK_ID"));
System.getenv
为获取环境变量的方式,详细介绍可参考重点:通过环境变量获取对应的节点信息。 - 存储格式
- 文件格式:CSV格式,字段间以逗号分隔。
- 行分布:首行为列名称,后续各行为具体字段的数据。
"name","age" "李三","12"
- 结果集文件获取方式
- 代码开发完成后进行Maven构建打包。将本身依赖的包打成JAR包。
- 后续步骤:上传插件包。如果依赖了protobuf、guava的包,或者其他外部依赖JAR包,需将这些依赖的包和上述步骤打成的JAR包合并为ZIP包后,作为插件上传至DataWorks。上传插件的详细操作可参考新增节点插件。
重点:通过环境变量获取对应的节点信息
获取环境变量值的方式:System.getenv("SKYNET_ONDUTY")
。
- SKYNET_ID:节点ID,只有在调度运行才生效,数据开发直接运行是无该参数的。
- SKYNET_BIZDATE:业务日期。
- SKYNET_ONDUTY:
- 临时运行、补数据、测试时:表示操作者ID。
- 日常调度:节点责任人ID(工号/baseid)。
- SKYNET_TASKID:节点实例。
- IDSKYNET_SYSTEM_ENV:对应的项目环境,包括:dev 、prod。
- SKYNET_CYCTIME:节点实例的定时时间。
- SKYNET_CONNECTION:该环境变量可以读取到节点绑定数据源的具体连接串信息(通常为JSON格式)。
- SKYNET_TENANT_ID:租户ID。