自定义实时数据源进行实时开发示例

本文通过示例的方式为您介绍在Dataphin中如何使用自定义实时数据源进行实时开发。

操作步骤

步骤一:创建实时数据源

  1. 在Dataphin首页的顶部菜单栏中,选择管理中心 > 数据源管理

  2. 在左侧导航栏中,选择数据源 > 自定义源类型

  3. 自定义源类型页面中,单击新建自定义源类型,选择新建实时自定义源类型,打开新建实时自定义源类型页面。

  4. 新建实时自定义源类型页面,配置以下参数。

    参数

    模式

    类型名称

    填写Oracle_cdc_test

    类型编码

    填写Oracle_cdc_test

    JAR包

    上传数据源类型连接器的JAR包。本案例以社区版本的Oracle CDC Connector为例。JAR包下载,请参见Oracle | Apache Flink CDC

    配置文件

    填写以下配置文件:

    说明

    若使用其他JAR包,请进行对应修改。

    #delclare the use type of this connector
    kind:
      source
    # the name of this connector
    # if you use vvp flink,you also need to follow this match pattern [a-z0-9]([-a-z0-9\.]*[a-z0-9])?
    connector: oracle-cdc
    # for example, hostname is one property of this connector, you can see it in with clause
    # when isSensitive is true, usually used for password, it is mean that the value of your key will be encrypted in page
    # defaultValue is the default value of this key
    # when isRequired is true, you must set value to the key, and the key in page is marked begin with start *
    # when module is datasource it is mean that you can set the config in datasource page
    # when module is table it is mean that you can set the config in table page
    hostname:
      isSensitive: false
      isRequired: true
      module: datasource
    port:
      isSensitive: false
      isRequired: true
      module: datasource
    username:
      isSensitive: false
      isRequired: true
      module: datasource
    password:
      isSensitive: true
      isRequired: true
      module: datasource
    database-name:
      isSensitive: false
      isRequired: true
      module: table
    schema-name:
      isSensitive: false
      isRequired: true
      module: table
    table-name:
      isSensitive: false
      isRequired: true
      module: table
    table-name:
      isSensitive: false
      isRequired: true
      defaultValue: latest-offset
      module: table
    #format define the input or sink format of the data, it always with module table

    配置项说明:

    配置文件的配置项参数,在系统执行时,最终会变成Flink DDL中的WITH参数。如下所示:

    CREATE TABLE MyUserTable (
      ID INT NOT NULL,
      NAME STRING,
      DESCRIPTION STRING,
      WEIGHT DECIMAL(10, 3),
      PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
      'connector' = 'oracle-cdc',
      'hostname' = 'localhost',
      'port' = '1521',
      'username' = 'Dataphin',
      'password' = 'fli*****ssword',
      'database-name' = 'ORCLCDB',
      'schema-name' = 'dataphin',
      'table-name' = 'myusertable');

    描述

    填写自定义实时数据源测试

  5. 单击确定,完成自定义实时数据源类型。

步骤二:自定义Ververica Flink Connector

若您的Dataphin租户使用开源Flink实时计算引擎,则无需进行此步骤操作。更多信息,请参见自定义Ververica Flink Connector说明

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击连接器

  4. 连接器页面,单击创建自定义连接器

  5. 上传自定义连接器JAR文件。

    您可以通过以下任何一种方式上传自定义连接器JAR文件:

    • 上传文件:单击选择文件后,选择您的目标连接器JAR文件。

    • 外部URL:当需要使用其他服务上存在的JAR文件时,可以使用外部URL功能获取JAR文件。例如,https://ossbucket/artifacts/namespaces/flink-default/flink-jobs-1.0-SNAPSHOT.jar

      说明

      仅支持以下两类外部URL:

      • 开通Flink工作空间时选择的OSS Bucket地址。您可以在实时计算管理控制台目标工作空间详情中查看绑定的OSS Bucket。

      • 实时计算Flink版可以访问且被允许访问(公共读或被授予权限)的其他外部存储系统地址。

  6. 上传完成后,单击下一步

    系统会对您上传的自定义连接器内容进行解析。如果解析成功,您可以继续下一步。如果解析失败,请确认您上传的自定义连接器代码是否符合Flink社区标准。

  7. 单击完成

    创建完成的自定义连接器会出现在连接器列表中。

    说明

    WITH参数中的连接器取值为您自定义连接器JAR包中DynamicTableFactory的identifier参数取值,其他WITH参数及含义详情由您开发的自定义连接器决定。

步骤三:创建自定义的实时数据源

完成实时数据源的创建后,Dataphin数据源管理将会生成自定义的实时数据源。您可以在数据源管理配置实时数据源的连接信息,将数据源接入Dataphin,以支持后续的元表创建。

  1. 在Dataphin首页的顶部菜单栏中,选择管理中心 > 数据源管理

  2. 在数据源页面,单击+新建数据源,打开新建数据源对话框。

  3. 新建数据源对话框的自定义数据源区域,选择Oracle_cdc_test

  4. 新建Oracle-cdc-test数据源对话框中,配置连接数据源参数。

    自定义实时数据源的配置项由步骤一中的YAML配置文件决定,YAML配置文件的不同配置参数,会导致数据源界面出现不同的选项,并且选项的展示形式也有所不同。

    参数

    描述

    数据源名称

    填写Oracle-cdc-test

    数据源编码

    填写Oracle_cdc_test

    数据源描述

    填写自定义实时数据源接入测试

    数据源配置

    选择生产数据源

    标签

    默认为

    hostname

    填写Oracle服务的主机地址。

    port

    填写Oracle服务的端口。

    username

    填写Oracle服务的用户名。

    password

    填写Oracle服务的密码。

  5. 单击确定,完成自定义数据源Oracle-cdc-test的创建。

    说明

    自定义Connector(实时数据源)不支持连通性检查。

步骤四:新建自定义数据源的元表

在数据源管理配置自定义数据源的连接信息后,您可以使用配置的数据源创建元表以支持后续的实时开发。

  1. 在Dataphin首页的顶部菜单栏中,选择研发 > 数据开发

  2. 在顶部菜单栏选择项目(Dev-Prod 模式需要选择环境)。

  3. 在左侧导航栏中选择数据处理 > 表管理

  4. 在右侧表管理列表中,单击image新建图标,选择实时计算表,打开新建表对话框。

  5. 新建表对话框,配置参数。

    元表中的配置项由步骤一中的YAML配置文件决定,YAML配置文件的不同配置参数,会导致元表出现不同的选项,并且选项的展示形式也有不同。

    参数

    描述

    表类型

    选择元表

    元表名称

    填写Oracle_cdc_test

    数据源

    选择创建的Oracle-cdc-test

    database-name

    填写数据库名称

    schema-name

    填写schema

    table-name

    填写数据表名称

    选择目录

    默认实时计算表

    描述

    填写自定义实时数据源的元表测试

  6. 单击确定,即可完成元表的创建。

    完成元表创建后,Dataphin会自动组装元表的Flink SQL。代码中的WITH参数的参数名是在YAML配置中定义的;具体的参数值,是在数据源页面和元表页面配置的;代码中的字段是在元表结构配置的。最终生成元表的Oracle_cdc_test代码如下:

    create table oracle_cdc_test ( 
    	`id`	INT	comment '',
    	`name`	VARCHAR	comment '',
    	PRIMARY KEY(id) NOT ENFORCED 
    )
     with ( 
    'hostname'='47.***.***.217'
    ,'connector'='oracle-cdc'
    ,'port'='1511'
    ,'database-name'='dataphin'
    ,'schema-name'='dataphin'
    ,'table-name'='dataphin-tables'
    ,'username'='flink_demo'
    
    );