开发自定义组件示例

本文通过示例的方式为您介绍自定义输入与输出组件的开发案例。

前提条件

  1. 单击此处下载依赖压缩包

  2. 将压缩包中的plugin.center.base-0.0.1-SNAPSHOT.jar依赖包添加进Maven的私有仓库中。

背景信息

使用自定义组件前,需先创建离线自定义源类型并创建对应数据源。

  • 创建RDBMS类型的离线自定义源类型,在集成管道组件库的开放目录下自动生成输入和输出组件。

  • 创建其他数据库类型的离线自定义源类型,系统会根据您上传的读取插件写入插件,在组件库的开放目录下生成对应的输入组件输出组件

操作步骤

步骤一:准备读/写插件的JAR文件

  • Maven的依赖如下。

    <dependency>
      <groupId>com.alibaba.dt.pipeline</groupId>
      <artifactId>plugin.center.base</artifactId>
      <version>0.0.1-SNAPSHOT</version>
    </dependency>
  • 读取插件文件的代码示例如下

    package demo;
    
    import com.alibaba.dt.pipeline.plugin.center.base.Reader;
    import com.alibaba.dt.pipeline.plugin.center.base.RecordSender;
    import com.alibaba.dt.pipeline.plugin.center.conf.Configuration;
    import com.alibaba.dt.pipeline.plugin.center.element.*;
    import com.alibaba.dt.pipeline.plugin.center.record.Record;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    import java.util.Random;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    /**
     * 用户入口类ReaderDemo
     * 该类必须继承自com.alibaba.dt.pipeline.plugin.center.base.Reader
     * 该类中暂时不需要定义方法,但是必须要定义两个public的静态子类:JobTask,名字必须是JobTask,大小写敏感,否则系统找不到类
     *
     * 系统会首先初始化Job类,调用init做初始化,再调用prepare做准备工作,然后调用split,把jobconfiguration拆分成用户指定的并发度个数的
     * configuration。然后执行task。当所有的task全部执行完毕,再执行post,最后再执行destroy。destroypost方法的不同是,destroy总是
     * 会执行,哪怕出现了异常。
     *
     * 当系统调用Jobsplit方法得到很多configuration后,会用每一个configuration实例化一个Task,Task的方法依次执行的顺序是:init,
     * prepare,getInputRowMeta,startRead,post,destroy,同样的,destroypost方法的不同是,destroy总是会执行,哪怕出现了异常。
     *
     */
    public class ReaderDemo extends Reader {
        // 这个是用户在自定义数据源中定义的key,这是一个demo数据源的key,用户自定义的话,这个key值应该不同
        public static final String DS_KEY = "demo_ds";
        // 代码内部定义的task编号,没有使用到可以不用
        public static final String TASK_INDEX = "taskIndex";
        // 用户在离线管道中配置该组件的一个参数
        public static final String USER_KEY = "user_param";
    
        /**
         * public的静态子类Job必须继承自Reader.Job
         */
        public static class Job extends Reader.Job {
            private static final Logger logger = LoggerFactory.getLogger(Job.class);
    
            Configuration jobConfig;
    
            @Override
            public void init() {
                logger.info("job init");
                //通过这个方法拿到用户的输入组件配置,这些参数就是用户在"输入组件"界面上配置的参数
                this.jobConfig = super.getPluginJobConf();
                String value = jobConfig.getString(USER_KEY, "default_value");
                String ds = jobConfig.getString(DS_KEY, "default_ds");
                logger.info("user_param:{} ds:{}", value, ds);
            }
    
            @Override
            public void prepare() {
                super.prepare();
                logger.info("job prepare");
            }
    
            @Override
            public List<Configuration> split(int i) {
                logger.info("job split:{}", i);
                return IntStream.range(0, i).boxed().map(x -> {
                    Configuration tmpConfiguration = jobConfig.clone();
                    // 写入configuration的编号
                    tmpConfiguration.set(TASK_INDEX, x);
                    return tmpConfiguration;
                }).collect(Collectors.toList());
            }
    
            @Override
            public void post() {
                super.post();
                logger.info("job post");
            }
    
            @Override
            public void destroy() {
                logger.info("job destroy");
            }
        }
    
        /**
         * public的静态子类Task必须继承自Reader.Task
         */
        public static class Task extends Reader.Task {
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
    
            private Configuration taskConfig;
            private int index;
            private RowMeta rowMeta;
    
            @Override
            public void init() {
                // 获取Job split出来的configuration
                this.taskConfig = super.getPluginJobConf();
                // 获取Task的编号
                index = taskConfig.getInt(TASK_INDEX, -1);
                logger.info("task init:{}", index);
            }
    
            @Override
            public void prepare() {
                super.prepare();
                logger.info("task prepare");
            }
    
            @Override
            public void startRead(RecordSender recordSender) {
                logger.info("task start");
    
                Random random = new Random();
    
                // 读取数据,封装成Record,发送到系统内部
                for(int i = 0; i < 10; i++) {
                    Record record = recordSender.createRecord();
                    // 只是3个列,这个列的类型需要和getInputRowMeta函数的meta对其,如果是真实数据源db,需要把读取到的数据转换成特定的column
                    record.addColumn(new LongColumn(i));
                    record.addColumn(new StringColumn("name_" + i));
                    record.addColumn(new DoubleColumn(random.nextDouble()));
                    recordSender.sendToWriter(record);
                    logger.info("read record:{}", i);
                }
            }
    
            @Override
            public RowMeta getInputRowMeta(){
                logger.info("task column meta");
                rowMeta = new RowMeta();
    
                /**
                 * 在这里定义输入组件读取到数据后,往下游写出的数据的schema,一般而言,用户可能需要连接到db,获取到真实数据源的schema
                 * 特别注意:这里的column名字必须和输入组件配置页面的column名字完全一样,顺序也要一样。比如:这里定义了id、name、score
                 * 那么,该组件的配置页面也必须配置上一样的列名:id、name、score
                 */
                ColumnMeta columnMeta1 = new ColumnMeta();
                columnMeta1.setName("id");
                columnMeta1.setType(Column.Type.LONG);
                rowMeta.addColumnMeta(columnMeta1);
    
                ColumnMeta columnMeta2 = new ColumnMeta();
                columnMeta2.setName("name");
                columnMeta2.setType(Column.Type.STRING);
                rowMeta.addColumnMeta(columnMeta2);
    
                ColumnMeta columnMeta3 = new ColumnMeta();
                columnMeta3.setName("score");
                columnMeta3.setType(Column.Type.DOUBLE);
                rowMeta.addColumnMeta(columnMeta3);
    
                return rowMeta;
            }
    
            @Override
            public void post() {
                super.post();
                logger.info("task post");
            }
    
            @Override
            public void destroy() {
                logger.info("task destroy");
    
            }
        }
    }
  • 写入插件文件的代码示例如下

    package demo;
    
    import com.alibaba.dt.pipeline.plugin.center.base.RecordReceiver;
    import com.alibaba.dt.pipeline.plugin.center.base.Writer;
    import com.alibaba.dt.pipeline.plugin.center.conf.Configuration;
    import com.alibaba.dt.pipeline.plugin.center.record.Record;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    /**
     * 用户入口类WriterDemo
     * 该类必须继承自com.alibaba.dt.pipeline.plugin.center.base.Writer
     * 该类中暂时不需要定义方法,但是必须要定义两个public的静态子类:JobTask,名字必须是JobTask,大小写敏感,否则系统找不到类
     *
     * 系统会首先初始化Job类,调用init做初始化,再调用prepare做准备工作,然后调用split,把jobconfiguration拆分成用户指定的并发度个数的
     * configuration。然后执行task。当所有的task全部执行完毕,再执行post,最后再执行destroy。destroypost方法的不同是,destroy总是
     * 会执行,哪怕出现了异常。
     *
     * 当系统调用Jobsplit方法得到很多configuration后,会用每一个configuration实例化一个Task,Task的方法依次执行的顺序是:init,
     * prepare,startWrite,post,destroy,同样的,destroypost方法的不同是,destroy总是会执行,哪怕出现了异常。
     *
     */
    public class WriterDemo extends Writer {
        // 这个是用户在自定义数据源中定义的key,这是一个demo数据源的key,用户自定义的话,这个key值应该不同
        public static final String DS_KEY = "demo_ds";
        // 代码内部定义的task编号,没有使用到可以不用
        public static final String TASK_INDEX = "taskIndex";
        // 用户在离线管道中配置该组件的一个参数
        public static final String USER_KEY = "user_param";
    
        /**
         * public的静态子类Job必须继承自Writer.Job
         */
        public static class Job extends Writer.Job {
            private static final Logger logger = LoggerFactory.getLogger(Job.class);
    
            Configuration jobConfig;
    
            @Override
            public void init() {
                logger.info("job init");
                //通过这个方法拿到用户的输出组件配置,这些参数就是用户在输出组件界面上配置的参数
                this.jobConfig = super.getPluginJobConf();
                String value = jobConfig.getString(USER_KEY, "default_value");
                String ds = jobConfig.getString(DS_KEY, "default_ds");
                logger.info("user_param:{} ds:{}", value, ds);
            }
    
            @Override
            public void prepare() {
                super.prepare();
                logger.info("job prepare");
            }
    
            @Override
            public List<Configuration> split(int i) {
                logger.info("job split:{}", i);
                return IntStream.range(0, i).boxed().map(x -> {
                    Configuration tmpConfiguration = jobConfig.clone();
                    // 写入configuration的编号
                    tmpConfiguration.set(TASK_INDEX, x);
                    return tmpConfiguration;
                }).collect(Collectors.toList());
            }
    
            @Override
            public void post() {
                super.post();
                logger.info("job post");
            }
    
            @Override
            public void destroy() {
                logger.info("job destroy");
            }
        }
    
        /**
         * public的静态子类Task必须继承自Writer.Task
         */
        public static class Task extends Writer.Task {
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
    
            private Configuration taskConfig;
            private int index;
    
            @Override
            public void init() {
                // 获取Job split出来的configuration
                this.taskConfig = super.getPluginJobConf();
                // 获取Task的编号
                index = taskConfig.getInt(TASK_INDEX, -1);
                logger.info("task init:{}", index);
            }
    
            @Override
            public void prepare() {
                super.prepare();
                logger.info("task prepare");
            }
    
            @Override
            public void startWrite(RecordReceiver recordReceiver) {
                logger.info("task start");
    
                Record record;
                while ((record = recordReceiver.getFromReader()) != null) {
                    logger.info("======: " + record.toString());
                }
            }
    
            @Override
            public void post() {
                super.post();
                logger.info("task post");
            }
    
            @Override
            public void destroy() {
                logger.info("task destroy");
    
            }
        }
    }
                                

步骤二:创建其他数据库类型的离线自定义源类型

  1. Dataphin首页,单击顶部菜单栏管理中心 > 数据源管理

  2. 在左侧导航栏单击自定义源类型

  3. 自定义源类型页面中,单击新建自定义源类型,下拉列表中选择新建离线自定义源类型image

  4. 新建离线自定义源类型页面,配置参数。

    参数

    描述

    基本配置

    类型

    选择其他数据库

    名称

    自定义组件的名称。

    支持中文、英文字母大小写、下划线(_)和数字。长度不超过64个字符。

    类型编码

    组件的唯一标识。供后端使用,创建后不可编辑。

    仅支持英文字母大小写、数字和下划线(_),且不能以数字开头。

    数据源JSON

    填写数据源JSON代码及上传读写插件:

    填写数据源JSON代码,即定义数据源的配置项,代码示例说明如下:

    [
      {
        "columnName": "url",
        "columnType": "NORMAL",
        "text": {
          "zh_CN": "链接地址",
          "en_US": "address",
          "zh_TW": "繁体"
        },
        "placeholder": {
          "zh_CN": "请输入链接地址",
          "en_US": "input address",
          "zh_TW": "繁体"
        }
      },
      {
        "columnName": "username",
        "columnType": "NORMAL",
        "text": {
          "zh_CN": "用户名",
          "en_US": "username",
          "zh_TW": "繁体"
        },
        "placeholder": {
          "zh_CN": "请输入用户名",
          "en_US": "input username",
          "zh_TW": "繁体"
        }
      },
      {
        "columnName": "password",
        "columnType": "ENCRYPT",
        "text": {
          "zh_CN": "密码",
          "en_US": "password",
          "zh_TW": "繁体"
        },
        "placeholder": {
          "zh_CN": "请输入密码",
          "en_US": "input password",
          "zh_TW": "繁体"
        }
      }
    ]

    上述JSON示例中字段说明如下:

    • columnName:参数名称。

    • columnType:参数类型分为NORMALENCRYPT。密码类的参数,参数类型需使用ENCRYPT。

    • placeholder:用户输入框中的默认值在三种语言情况下的输出。若不填写,则用户输入框为空。

    • text:参数名称在三种语言情况下的输出。

    资源配置

    读写插件

    选择需要配置的插件类型、填写对应的ClassName(插件类名)并上传步骤一中准备好的插件文件。插件文件仅支持.jar类型,文件大小不超过50MB

    说明

    读写插件需至少选择一项进行配置。完成读取插件写入插件的配置后,会生成对应的输入插件输出插件

    描述信息

    描述

    对自定义的数据源的简单描述。不超过128个字符。

  5. 单击创建