本文通过示例的方式为您介绍自定义输入与输出组件的开发案例。
前提条件
单击此处下载依赖压缩包。
将压缩包中的
plugin.center.base-0.0.1-SNAPSHOT.jar
依赖包添加进Maven的私有仓库中。
背景信息
使用自定义组件前,需先创建离线自定义源类型并创建对应数据源。
创建RDBMS类型的离线自定义源类型,在集成管道组件库的开放目录下自动生成输入和输出组件。
创建其他数据库类型的离线自定义源类型,系统会根据您上传的读取插件和写入插件,在组件库的开放目录下生成对应的输入组件和输出组件。
操作步骤
步骤一:准备读/写插件的JAR文件
Maven的依赖如下。
<dependency> <groupId>com.alibaba.dt.pipeline</groupId> <artifactId>plugin.center.base</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
读取插件文件的代码示例如下。
package demo; import com.alibaba.dt.pipeline.plugin.center.base.Reader; import com.alibaba.dt.pipeline.plugin.center.base.RecordSender; import com.alibaba.dt.pipeline.plugin.center.conf.Configuration; import com.alibaba.dt.pipeline.plugin.center.element.*; import com.alibaba.dt.pipeline.plugin.center.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * 用户入口类ReaderDemo * 该类必须继承自com.alibaba.dt.pipeline.plugin.center.base.Reader * 该类中暂时不需要定义方法,但是必须要定义两个public的静态子类:Job和Task,名字必须是Job和Task,大小写敏感,否则系统找不到类 * * 系统会首先初始化Job类,调用init做初始化,再调用prepare做准备工作,然后调用split,把job的configuration拆分成用户指定的并发度个数的 * configuration。然后执行task。当所有的task全部执行完毕,再执行post,最后再执行destroy。destroy和post方法的不同是,destroy总是 * 会执行,哪怕出现了异常。 * * 当系统调用Job的split方法得到很多configuration后,会用每一个configuration实例化一个Task,Task的方法依次执行的顺序是:init, * prepare,getInputRowMeta,startRead,post,destroy,同样的,destroy和post方法的不同是,destroy总是会执行,哪怕出现了异常。 * */ public class ReaderDemo extends Reader { // 这个是用户在自定义数据源中定义的key,这是一个demo数据源的key,用户自定义的话,这个key值应该不同 public static final String DS_KEY = "demo_ds"; // 代码内部定义的task编号,没有使用到可以不用 public static final String TASK_INDEX = "taskIndex"; // 用户在离线管道中配置该组件的一个参数 public static final String USER_KEY = "user_param"; /** * public的静态子类Job必须继承自Reader.Job */ public static class Job extends Reader.Job { private static final Logger logger = LoggerFactory.getLogger(Job.class); Configuration jobConfig; @Override public void init() { logger.info("job init"); //通过这个方法拿到用户的输入组件配置,这些参数就是用户在"输入组件"界面上配置的参数 this.jobConfig = super.getPluginJobConf(); String value = jobConfig.getString(USER_KEY, "default_value"); String ds = jobConfig.getString(DS_KEY, "default_ds"); logger.info("user_param:{} ds:{}", value, ds); } @Override public void prepare() { super.prepare(); logger.info("job prepare"); } @Override public List<Configuration> split(int i) { logger.info("job split:{}", i); return IntStream.range(0, i).boxed().map(x -> { Configuration tmpConfiguration = jobConfig.clone(); // 写入configuration的编号 tmpConfiguration.set(TASK_INDEX, x); return tmpConfiguration; }).collect(Collectors.toList()); } @Override public void post() { super.post(); logger.info("job post"); } @Override public void destroy() { logger.info("job destroy"); } } /** * public的静态子类Task必须继承自Reader.Task */ public static class Task extends Reader.Task { private static final Logger logger = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private int index; private RowMeta rowMeta; @Override public void init() { // 获取Job split出来的configuration this.taskConfig = super.getPluginJobConf(); // 获取Task的编号 index = taskConfig.getInt(TASK_INDEX, -1); logger.info("task init:{}", index); } @Override public void prepare() { super.prepare(); logger.info("task prepare"); } @Override public void startRead(RecordSender recordSender) { logger.info("task start"); Random random = new Random(); // 读取数据,封装成Record,发送到系统内部 for(int i = 0; i < 10; i++) { Record record = recordSender.createRecord(); // 只是3个列,这个列的类型需要和getInputRowMeta函数的meta对其,如果是真实数据源db,需要把读取到的数据转换成特定的column record.addColumn(new LongColumn(i)); record.addColumn(new StringColumn("name_" + i)); record.addColumn(new DoubleColumn(random.nextDouble())); recordSender.sendToWriter(record); logger.info("read record:{}", i); } } @Override public RowMeta getInputRowMeta(){ logger.info("task column meta"); rowMeta = new RowMeta(); /** * 在这里定义输入组件读取到数据后,往下游写出的数据的schema,一般而言,用户可能需要连接到db,获取到真实数据源的schema * 特别注意:这里的column名字必须和输入组件配置页面的column名字完全一样,顺序也要一样。比如:这里定义了id、name、score * 那么,该组件的配置页面也必须配置上一样的列名:id、name、score */ ColumnMeta columnMeta1 = new ColumnMeta(); columnMeta1.setName("id"); columnMeta1.setType(Column.Type.LONG); rowMeta.addColumnMeta(columnMeta1); ColumnMeta columnMeta2 = new ColumnMeta(); columnMeta2.setName("name"); columnMeta2.setType(Column.Type.STRING); rowMeta.addColumnMeta(columnMeta2); ColumnMeta columnMeta3 = new ColumnMeta(); columnMeta3.setName("score"); columnMeta3.setType(Column.Type.DOUBLE); rowMeta.addColumnMeta(columnMeta3); return rowMeta; } @Override public void post() { super.post(); logger.info("task post"); } @Override public void destroy() { logger.info("task destroy"); } } }
写入插件文件的代码示例如下。
package demo; import com.alibaba.dt.pipeline.plugin.center.base.RecordReceiver; import com.alibaba.dt.pipeline.plugin.center.base.Writer; import com.alibaba.dt.pipeline.plugin.center.conf.Configuration; import com.alibaba.dt.pipeline.plugin.center.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * 用户入口类WriterDemo * 该类必须继承自com.alibaba.dt.pipeline.plugin.center.base.Writer * 该类中暂时不需要定义方法,但是必须要定义两个public的静态子类:Job和Task,名字必须是Job和Task,大小写敏感,否则系统找不到类 * * 系统会首先初始化Job类,调用init做初始化,再调用prepare做准备工作,然后调用split,把job的configuration拆分成用户指定的并发度个数的 * configuration。然后执行task。当所有的task全部执行完毕,再执行post,最后再执行destroy。destroy和post方法的不同是,destroy总是 * 会执行,哪怕出现了异常。 * * 当系统调用Job的split方法得到很多configuration后,会用每一个configuration实例化一个Task,Task的方法依次执行的顺序是:init, * prepare,startWrite,post,destroy,同样的,destroy和post方法的不同是,destroy总是会执行,哪怕出现了异常。 * */ public class WriterDemo extends Writer { // 这个是用户在自定义数据源中定义的key,这是一个demo数据源的key,用户自定义的话,这个key值应该不同 public static final String DS_KEY = "demo_ds"; // 代码内部定义的task编号,没有使用到可以不用 public static final String TASK_INDEX = "taskIndex"; // 用户在离线管道中配置该组件的一个参数 public static final String USER_KEY = "user_param"; /** * public的静态子类Job必须继承自Writer.Job */ public static class Job extends Writer.Job { private static final Logger logger = LoggerFactory.getLogger(Job.class); Configuration jobConfig; @Override public void init() { logger.info("job init"); //通过这个方法拿到用户的输出组件配置,这些参数就是用户在输出组件界面上配置的参数 this.jobConfig = super.getPluginJobConf(); String value = jobConfig.getString(USER_KEY, "default_value"); String ds = jobConfig.getString(DS_KEY, "default_ds"); logger.info("user_param:{} ds:{}", value, ds); } @Override public void prepare() { super.prepare(); logger.info("job prepare"); } @Override public List<Configuration> split(int i) { logger.info("job split:{}", i); return IntStream.range(0, i).boxed().map(x -> { Configuration tmpConfiguration = jobConfig.clone(); // 写入configuration的编号 tmpConfiguration.set(TASK_INDEX, x); return tmpConfiguration; }).collect(Collectors.toList()); } @Override public void post() { super.post(); logger.info("job post"); } @Override public void destroy() { logger.info("job destroy"); } } /** * public的静态子类Task必须继承自Writer.Task */ public static class Task extends Writer.Task { private static final Logger logger = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private int index; @Override public void init() { // 获取Job split出来的configuration this.taskConfig = super.getPluginJobConf(); // 获取Task的编号 index = taskConfig.getInt(TASK_INDEX, -1); logger.info("task init:{}", index); } @Override public void prepare() { super.prepare(); logger.info("task prepare"); } @Override public void startWrite(RecordReceiver recordReceiver) { logger.info("task start"); Record record; while ((record = recordReceiver.getFromReader()) != null) { logger.info("======: " + record.toString()); } } @Override public void post() { super.post(); logger.info("task post"); } @Override public void destroy() { logger.info("task destroy"); } } }
步骤二:创建其他数据库类型的离线自定义源类型
在Dataphin首页,单击顶部菜单栏管理中心 > 数据源管理。
在左侧导航栏单击自定义源类型。
在自定义源类型页面中,单击新建自定义源类型,下拉列表中选择新建离线自定义源类型。
在新建离线自定义源类型页面,配置参数。
参数
描述
基本配置
类型
选择其他数据库。
名称
自定义组件的名称。
支持中文、英文字母大小写、下划线(_)和数字。长度不超过64个字符。
类型编码
组件的唯一标识。供后端使用,创建后不可编辑。
仅支持英文字母大小写、数字和下划线(_),且不能以数字开头。
数据源JSON
填写数据源JSON代码及上传读写插件:
填写数据源JSON代码,即定义数据源的配置项,代码示例说明如下:
[ { "columnName": "url", "columnType": "NORMAL", "text": { "zh_CN": "链接地址", "en_US": "address", "zh_TW": "繁体" }, "placeholder": { "zh_CN": "请输入链接地址", "en_US": "input address", "zh_TW": "繁体" } }, { "columnName": "username", "columnType": "NORMAL", "text": { "zh_CN": "用户名", "en_US": "username", "zh_TW": "繁体" }, "placeholder": { "zh_CN": "请输入用户名", "en_US": "input username", "zh_TW": "繁体" } }, { "columnName": "password", "columnType": "ENCRYPT", "text": { "zh_CN": "密码", "en_US": "password", "zh_TW": "繁体" }, "placeholder": { "zh_CN": "请输入密码", "en_US": "input password", "zh_TW": "繁体" } } ]
上述JSON示例中字段说明如下:
columnName:参数名称。
columnType:参数类型分为NORMAL和ENCRYPT。密码类的参数,参数类型需使用ENCRYPT。
placeholder:用户输入框中的默认值在三种语言情况下的输出。若不填写,则用户输入框为空。
text:参数名称在三种语言情况下的输出。
资源配置
读写插件
选择需要配置的插件类型、填写对应的ClassName(插件类名)并上传步骤一中准备好的插件文件。插件文件仅支持.jar类型,文件大小不超过50MB。
说明读写插件需至少选择一项进行配置。完成读取插件和写入插件的配置后,会生成对应的输入插件和输出插件。
描述信息
描述
对自定义的数据源的简单描述。不超过128个字符。
单击创建。