本文通过示例的方式为您介绍在Dataphin中如何使用自定义实时数据源进行实时开发。
操作步骤
步骤一:创建实时数据源
在Dataphin首页的顶部菜单栏中,选择管理中心 > 数据源管理。
在左侧导航栏中,选择数据源 > 自定义源类型。
在自定义源类型页面中,单击新建自定义源类型,选择新建实时自定义源类型,打开新建实时自定义源类型页面。
在新建实时自定义源类型页面,配置以下参数。
参数
模式
类型名称
填写Oracle_cdc_test。
类型编码
填写Oracle_cdc_test。
JAR包
上传数据源类型连接器的JAR包。本案例以社区版本的Oracle CDC Connector为例。JAR包下载,请参见Oracle | Apache Flink CDC。
配置文件
填写以下配置文件:
说明若使用其他JAR包,请进行对应修改。
#delclare the use type of this connector kind: source # the name of this connector # if you use vvp flink,you also need to follow this match pattern [a-z0-9]([-a-z0-9\.]*[a-z0-9])? connector: oracle-cdc # for example, hostname is one property of this connector, you can see it in with clause # when isSensitive is true, usually used for password, it is mean that the value of your key will be encrypted in page # defaultValue is the default value of this key # when isRequired is true, you must set value to the key, and the key in page is marked begin with start * # when module is datasource it is mean that you can set the config in datasource page # when module is table it is mean that you can set the config in table page hostname: isSensitive: false isRequired: true module: datasource port: isSensitive: false isRequired: true module: datasource username: isSensitive: false isRequired: true module: datasource password: isSensitive: true isRequired: true module: datasource database-name: isSensitive: false isRequired: true module: table schema-name: isSensitive: false isRequired: true module: table table-name: isSensitive: false isRequired: true module: table table-name: isSensitive: false isRequired: true defaultValue: latest-offset module: table #format define the input or sink format of the data, it always with module table
配置项说明:
配置文件的配置项参数,在系统执行时,最终会变成Flink DDL中的WITH参数。如下所示:
CREATE TABLE MyUserTable ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10, 3), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = 'localhost', 'port' = '1521', 'username' = 'Dataphin', 'password' = 'fli*****ssword', 'database-name' = 'ORCLCDB', 'schema-name' = 'dataphin', 'table-name' = 'myusertable');
描述
填写自定义实时数据源测试。
单击确定,完成自定义实时数据源类型。
步骤二:自定义Ververica Flink Connector
若您的Dataphin租户使用开源Flink实时计算引擎,则无需进行此步骤操作。更多信息,请参见自定义Ververica Flink Connector说明。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击连接器。
在连接器页面,单击创建自定义连接器。
上传自定义连接器JAR文件。
您可以通过以下任何一种方式上传自定义连接器JAR文件:
上传文件:单击选择文件后,选择您的目标连接器JAR文件。
外部URL:当需要使用其他服务上存在的JAR文件时,可以使用外部URL功能获取JAR文件。例如,
https://ossbucket/artifacts/namespaces/flink-default/flink-jobs-1.0-SNAPSHOT.jar
说明仅支持以下两类外部URL:
开通Flink工作空间时选择的OSS Bucket地址。您可以在实时计算管理控制台目标工作空间详情中查看绑定的OSS Bucket。
实时计算Flink版可以访问且被允许访问(公共读或被授予权限)的其他外部存储系统地址。
上传完成后,单击下一步。
系统会对您上传的自定义连接器内容进行解析。如果解析成功,您可以继续下一步。如果解析失败,请确认您上传的自定义连接器代码是否符合Flink社区标准。
单击完成。
创建完成的自定义连接器会出现在连接器列表中。
说明WITH参数中的连接器取值为您自定义连接器JAR包中DynamicTableFactory的identifier参数取值,其他WITH参数及含义详情由您开发的自定义连接器决定。
步骤三:创建自定义的实时数据源
完成实时数据源的创建后,Dataphin数据源管理将会生成自定义的实时数据源。您可以在数据源管理配置实时数据源的连接信息,将数据源接入Dataphin,以支持后续的元表创建。
在Dataphin首页的顶部菜单栏中,选择管理中心 > 数据源管理。
在数据源页面,单击+新建数据源,打开新建数据源对话框。
在新建数据源对话框的自定义数据源区域,选择Oracle_cdc_test。
在新建Oracle-cdc-test数据源对话框中,配置连接数据源参数。
自定义实时数据源的配置项由步骤一中的YAML配置文件决定,YAML配置文件的不同配置参数,会导致数据源界面出现不同的选项,并且选项的展示形式也有所不同。
参数
描述
数据源名称
填写Oracle-cdc-test。
数据源编码
填写Oracle_cdc_test。
数据源描述
填写自定义实时数据源接入测试。
数据源配置
选择生产数据源。
标签
默认为空。
hostname
填写Oracle服务的主机地址。
port
填写Oracle服务的端口。
username
填写Oracle服务的用户名。
password
填写Oracle服务的密码。
单击确定,完成自定义数据源Oracle-cdc-test的创建。
说明自定义Connector(实时数据源)不支持连通性检查。
步骤四:新建自定义数据源的元表
在数据源管理配置自定义数据源的连接信息后,您可以使用配置的数据源创建元表以支持后续的实时开发。
在Dataphin首页的顶部菜单栏中,选择研发 > 数据开发。
在顶部菜单栏选择项目(Dev-Prod 模式需要选择环境)。
在左侧导航栏中选择数据处理 > 表管理。
在右侧表管理列表中,单击新建图标,选择实时计算表,打开新建表对话框。
在新建表对话框,配置参数。
元表中的配置项由步骤一中的YAML配置文件决定,YAML配置文件的不同配置参数,会导致元表出现不同的选项,并且选项的展示形式也有不同。
参数
描述
表类型
选择元表。
元表名称
填写Oracle_cdc_test。
数据源
选择创建的Oracle-cdc-test。
database-name
填写数据库名称。
schema-name
填写schema。
table-name
填写数据表名称。
选择目录
默认实时计算表。
描述
填写自定义实时数据源的元表测试。
单击确定,即可完成元表的创建。
完成元表创建后,Dataphin会自动组装元表的Flink SQL。代码中的WITH参数的参数名是在YAML配置中定义的;具体的参数值,是在数据源页面和元表页面配置的;代码中的字段是在元表结构配置的。最终生成元表的Oracle_cdc_test代码如下:
create table oracle_cdc_test ( `id` INT comment '', `name` VARCHAR comment '', PRIMARY KEY(id) NOT ENFORCED ) with ( 'hostname'='47.***.***.217' ,'connector'='oracle-cdc' ,'port'='1511' ,'database-name'='dataphin' ,'schema-name'='dataphin' ,'table-name'='dataphin-tables' ,'username'='flink_demo' );