开发自定义组件示例

更新时间: 2023-07-11 17:44:09

本文通过示例的方式为您介绍自定义输入与输出组件的开发案例。

输入组件开发

  1. 通过以下示例代码构建Java工程,并打为JAR包。

    • Maven的依赖如下。

      <dependency>
        <groupId>com.alibaba.dt.pipeline</groupId>
        <artifactId>plugin.center.base</artifactId>
        <version>0.0.1-SNAPSHOT</version>
      </dependency>
    • 代码示例如下。

      package demo;
      
      import com.alibaba.dt.pipeline.plugin.center.base.Reader;
      import com.alibaba.dt.pipeline.plugin.center.base.RecordSender;
      import com.alibaba.dt.pipeline.plugin.center.conf.Configuration;
      import com.alibaba.dt.pipeline.plugin.center.element.*;
      import com.alibaba.dt.pipeline.plugin.center.record.Record;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.util.List;
      import java.util.Random;
      import java.util.stream.Collectors;
      import java.util.stream.IntStream;
      
      /**
       * 用户入口类ReaderDemo
       * 该类必须继承自com.alibaba.dt.pipeline.plugin.center.base.Reader
       * 该类中暂时不需要定义方法,但是必须要定义两个public的静态子类:Job和Task,名字必须是Job和Task,大小写敏感,否则系统找不到类
       *
       * 系统会首先初始化Job类,调用init做初始化,再调用prepare做准备工作,然后调用split,把job的configuration拆分成用户指定的并发度个数的
       * configuration。然后执行task。当所有的task全部执行完毕,再执行post,最后再执行destroy。destroy和post方法的不同是,destroy总是
       * 会执行,哪怕出现了异常。
       *
       * 当系统调用Job的split方法得到很多configuration后,会用每一个configuration实例化一个Task,Task的方法依次执行的顺序是:init,
       * prepare,getInputRowMeta,startRead,post,destroy,同样的,destroy和post方法的不同是,destroy总是会执行,哪怕出现了异常。
       *
       */
      public class ReaderDemo extends Reader {
          // 这个是用户在自定义数据源中定义的key,这是一个demo数据源的key,用户自定义的话,这个key值应该不同
          public static final String DS_KEY = "demo_ds";
          // 代码内部定义的task编号,没有使用到可以不用
          public static final String TASK_INDEX = "taskIndex";
          // 用户在离线管道中配置该组件的一个参数
          public static final String USER_KEY = "user_param";
      
          /**
           * public的静态子类Job必须继承自Reader.Job
           */
          public static class Job extends Reader.Job {
              private static final Logger logger = LoggerFactory.getLogger(Job.class);
      
              Configuration jobConfig;
      
              @Override
              public void init() {
                  logger.info("job init");
                  //通过这个方法拿到用户的输入组件配置,这些参数就是用户在"输入组件"界面上配置的参数
                  this.jobConfig = super.getPluginJobConf();
                  String value = jobConfig.getString(USER_KEY, "default_value");
                  String ds = jobConfig.getString(DS_KEY, "default_ds");
                  logger.info("user_param:{} ds:{}", value, ds);
              }
      
              @Override
              public void prepare() {
                  super.prepare();
                  logger.info("job prepare");
              }
      
              @Override
              public List<Configuration> split(int i) {
                  logger.info("job split:{}", i);
                  return IntStream.range(0, i).boxed().map(x -> {
                      Configuration tmpConfiguration = jobConfig.clone();
                      // 写入configuration的编号
                      tmpConfiguration.set(TASK_INDEX, x);
                      return tmpConfiguration;
                  }).collect(Collectors.toList());
              }
      
              @Override
              public void post() {
                  super.post();
                  logger.info("job post");
              }
      
              @Override
              public void destroy() {
                  logger.info("job destroy");
              }
          }
      
          /**
           * public的静态子类Task必须继承自Reader.Task
           */
          public static class Task extends Reader.Task {
              private static final Logger logger = LoggerFactory.getLogger(Task.class);
      
              private Configuration taskConfig;
              private int index;
              private RowMeta rowMeta;
      
              @Override
              public void init() {
                  // 获取Job split出来的configuration
                  this.taskConfig = super.getPluginJobConf();
                  // 获取Task的编号
                  index = taskConfig.getInt(TASK_INDEX, -1);
                  logger.info("task init:{}", index);
              }
      
              @Override
              public void prepare() {
                  super.prepare();
                  logger.info("task prepare");
              }
      
              @Override
              public void startRead(RecordSender recordSender) {
                  logger.info("task start");
      
                  Random random = new Random();
      
                  // 读取数据,封装成Record,发送到系统内部
                  for(int i = 0; i < 10; i++) {
                      Record record = recordSender.createRecord();
                      // 只是3个列,这个列的类型需要和getInputRowMeta函数的meta对其,如果是真实数据源db,需要把读取到的数据转换成特定的column
                      record.addColumn(new LongColumn(i));
                      record.addColumn(new StringColumn("name_" + i));
                      record.addColumn(new DoubleColumn(random.nextDouble()));
                      recordSender.sendToWriter(record);
                      logger.info("read record:{}", i);
                  }
              }
      
              @Override
              public RowMeta getInputRowMeta(){
                  logger.info("task column meta");
                  rowMeta = new RowMeta();
      
                  /**
                   * 在这里定义输入组件读取到数据后,往下游写出的数据的schema,一般而言,用户可能需要连接到db,获取到真实数据源的schema
                   * 特别注意:这里的column名字必须和输入组件配置页面的column名字完全一样,顺序也要一样。比如:这里定义了id、name、score
                   * 那么,该组件的配置页面也必须配置上一样的列名:id、name、score
                   */
                  ColumnMeta columnMeta1 = new ColumnMeta();
                  columnMeta1.setName("id");
                  columnMeta1.setType(Column.Type.LONG);
                  rowMeta.addColumnMeta(columnMeta1);
      
                  ColumnMeta columnMeta2 = new ColumnMeta();
                  columnMeta2.setName("name");
                  columnMeta2.setType(Column.Type.STRING);
                  rowMeta.addColumnMeta(columnMeta2);
      
                  ColumnMeta columnMeta3 = new ColumnMeta();
                  columnMeta3.setName("score");
                  columnMeta3.setType(Column.Type.DOUBLE);
                  rowMeta.addColumnMeta(columnMeta3);
      
                  return rowMeta;
              }
      
              @Override
              public void post() {
                  super.post();
                  logger.info("task post");
              }
      
              @Override
              public void destroy() {
                  logger.info("task destroy");
      
              }
          }
      }
  2. 请参见新建离线自定义源类型,新建自定义数据源。

    说明

    通过配置自定义数据源类型,Dataphin将自动为您生成对应的数据源类型和组件。

    参数示例如下。

    参数

    示例

    基本配置

    类型

    其他数据库。

    名称

    demo_reader。

    类型编码

    组件的唯一标识。供后端使用,创建后不可编辑。

    数据源JSON

    数据源JSON示例如下:

    [
        {
            "columnName":"demo_ds",
            "columnType":"NORMAL",
            "text":{
                "zh_CN":"数据源",
                "en_US":"数据源",
                "zh_TW":"数据源"
            },
            "placeholder":{
                "zh_CN":"abc",
                "en_US":"abc",
                "zh_TW":"abc"
            }
        }
    ]

    上述JSON示例解释说明如下:

    • columnName:系统会生成demo_reader的数据源模板,该模板只包含一个参数demo_ds。

    • columnType:参数类型为NORMAL。密码类的,需使用ENCRYPT。

    • placeholder:用户输入框中的默认值在三种语言情况下均为abc。不填写,则用户输入框为空。

    • text:参数名称在三种语言情况下均显示为数据源。

    资源配置

    读写插件

    勾选读取插件

    • ClassName:插件的类名。该示例为demo.ReaderDemo。

    • 上传文件:上传打包完成的JAR文件。

    描述信息

    描述

    请输入组件的简单描述信息。不可超过128个字符。

  3. 单击创建

    当自定义组件创建成功后,数据源管理会自动生成一个DEMO_READER的数据源类型;且在离线管道的组件库中也将生成一个DEMO_READER输入组件。

输出组件开发

  1. 通过以下示例代码构建Java工程,并打为JAR包。

    • Maven的依赖如下。

      <dependency>
        <groupId>com.alibaba.dt.pipeline</groupId>
        <artifactId>plugin.center.base</artifactId>
        <version>0.0.1-SNAPSHOT</version>
      </dependency>
    • 代码示例如下。

      package demo;
      
      import com.alibaba.dt.pipeline.plugin.center.base.RecordReceiver;
      import com.alibaba.dt.pipeline.plugin.center.base.Writer;
      import com.alibaba.dt.pipeline.plugin.center.conf.Configuration;
      import com.alibaba.dt.pipeline.plugin.center.record.Record;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.util.List;
      import java.util.stream.Collectors;
      import java.util.stream.IntStream;
      
      /**
       * 用户入口类WriterDemo
       * 该类必须继承自com.alibaba.dt.pipeline.plugin.center.base.Writer
       * 该类中暂时不需要定义方法,但是必须要定义两个public的静态子类:Job和Task,名字必须是Job和Task,大小写敏感,否则系统找不到类
       *
       * 系统会首先初始化Job类,调用init做初始化,再调用prepare做准备工作,然后调用split,把job的configuration拆分成用户指定的并发度个数的
       * configuration。然后执行task。当所有的task全部执行完毕,再执行post,最后再执行destroy。destroy和post方法的不同是,destroy总是
       * 会执行,哪怕出现了异常。
       *
       * 当系统调用Job的split方法得到很多configuration后,会用每一个configuration实例化一个Task,Task的方法依次执行的顺序是:init,
       * prepare,startWrite,post,destroy,同样的,destroy和post方法的不同是,destroy总是会执行,哪怕出现了异常。
       *
       */
      public class WriterDemo extends Writer {
          // 这个是用户在自定义数据源中定义的key,这是一个demo数据源的key,用户自定义的话,这个key值应该不同
          public static final String DS_KEY = "demo_ds";
          // 代码内部定义的task编号,没有使用到可以不用
          public static final String TASK_INDEX = "taskIndex";
          // 用户在离线管道中配置该组件的一个参数
          public static final String USER_KEY = "user_param";
      
          /**
           * public的静态子类Job必须继承自Writer.Job
           */
          public static class Job extends Writer.Job {
              private static final Logger logger = LoggerFactory.getLogger(Job.class);
      
              Configuration jobConfig;
      
              @Override
              public void init() {
                  logger.info("job init");
                  //通过这个方法拿到用户的输出组件配置,这些参数就是用户在输出组件界面上配置的参数
                  this.jobConfig = super.getPluginJobConf();
                  String value = jobConfig.getString(USER_KEY, "default_value");
                  String ds = jobConfig.getString(DS_KEY, "default_ds");
                  logger.info("user_param:{} ds:{}", value, ds);
              }
      
              @Override
              public void prepare() {
                  super.prepare();
                  logger.info("job prepare");
              }
      
              @Override
              public List<Configuration> split(int i) {
                  logger.info("job split:{}", i);
                  return IntStream.range(0, i).boxed().map(x -> {
                      Configuration tmpConfiguration = jobConfig.clone();
                      // 写入configuration的编号
                      tmpConfiguration.set(TASK_INDEX, x);
                      return tmpConfiguration;
                  }).collect(Collectors.toList());
              }
      
              @Override
              public void post() {
                  super.post();
                  logger.info("job post");
              }
      
              @Override
              public void destroy() {
                  logger.info("job destroy");
              }
          }
      
          /**
           * public的静态子类Task必须继承自Writer.Task
           */
          public static class Task extends Writer.Task {
              private static final Logger logger = LoggerFactory.getLogger(Task.class);
      
              private Configuration taskConfig;
              private int index;
      
              @Override
              public void init() {
                  // 获取Job split出来的configuration
                  this.taskConfig = super.getPluginJobConf();
                  // 获取Task的编号
                  index = taskConfig.getInt(TASK_INDEX, -1);
                  logger.info("task init:{}", index);
              }
      
              @Override
              public void prepare() {
                  super.prepare();
                  logger.info("task prepare");
              }
      
              @Override
              public void startWrite(RecordReceiver recordReceiver) {
                  logger.info("task start");
      
                  Record record;
                  while ((record = recordReceiver.getFromReader()) != null) {
                      logger.info("======: " + record.toString());
                  }
              }
      
              @Override
              public void post() {
                  super.post();
                  logger.info("task post");
              }
      
              @Override
              public void destroy() {
                  logger.info("task destroy");
      
              }
          }
      }
                                  
  2. 请参见新建离线自定义源类型,新建自定义组件。

    说明

    通过配置自定义数据源类型,Dataphin将自动为您生成对应的数据源类型和组件。

    参数示例如下。

    参数

    示例

    基本配置

    类型

    其他数据库。

    名称

    demo_reader。

    类型编码

    组件的唯一标识。供后端使用,创建后不可编辑。

    数据源JSON

    数据源JSON示例如下:

    [
        {
            "columnName":"demo_ds",
            "columnType":"NORMAL",
            "text":{
                "zh_CN":"数据源",
                "en_US":"数据源",
                "zh_TW":"数据源"
            },
            "placeholder":{
                "zh_CN":"abc",
                "en_US":"abc",
                "zh_TW":"abc"
            }
        }
    ]

    上述JSON示例解释说明如下:

    • columnName:系统会生成demo_reader的数据源模板,该模板只包含一个参数demo_ds。

    • columnType:参数类型为NORMAL。密码类的,需使用ENCRYPT。

    • placeholder:用户输入框中的默认值在三种语言情况下均为abc。不填写,则用户输入框为空。

    • text:参数名称在三种语言情况下均显示为数据源。

    资源配置

    读写插件

    勾选写入插件

    • ClassName:插件的类名。该示例为demo.ReaderDemo。

    • 上传文件:上传打包完成的JAR文件。

    描述信息

    描述

    请输入组件的简单描述信息。不可超过128个字符。

  3. 单击创建

    当自定义组件创建成功后,数据源管理会自动生成一个DEMO_READER的数据源类型;且在离线管道的组件库中也将生成一个DEMO_READER输出组件。

阿里云首页 智能数据建设与治理 Dataphin 相关技术圈