Learn how to use custom real-time data sources for real-time development in Dataphin.
Procedure
Step 1: Create a real-time data source
-
On the Dataphin home page, navigate to the top menu bar and select Management Center > Datasource Management.
-
In the left-side navigation pane, select Datasource > Custom Source Type.
-
On the Custom Source Type page, click Create Custom Source Type, select Create Real-time Custom Source Type, and open the Create Real-time Custom Source Type page.
-
On the Create Real-time Custom Source Type page, configure the following parameters:
Parameter
Pattern
Type Name
Enter Oracle_cdc_test.
Type Encoding
Enter Oracle_cdc_test.
JAR Package
Upload the connector JAR package. This example uses the Community Edition of the Oracle CDC Connector. To download the JAR package, see Oracle | Apache Flink CDC.
Configuration File
Enter the following configuration file:
NoteIf you use a different JAR package, modify the configuration accordingly.
#delclare the use type of this connector kind: source # the name of this connector # if you use vvp flink,you also need to follow this match pattern [a-z0-9]([-a-z0-9\.]*[a-z0-9])? connector: oracle-cdc # for example, hostname is one property of this connector, you can see it in with clause # when isSensitive is true, usually used for password, it is mean that the value of your key will be encrypted in page # defaultValue is the default value of this key # when isRequired is true, you must set value to the key, and the key in page is marked begin with start * # when module is datasource it is mean that you can set the config in datasource page # when module is table it is mean that you can set the config in table page hostname: isSensitive: false isRequired: true module: datasource port: isSensitive: false isRequired: true module: datasource username: isSensitive: false isRequired: true module: datasource password: isSensitive: true isRequired: true module: datasource database-name: isSensitive: false isRequired: true module: table schema-name: isSensitive: false isRequired: true module: table table-name: isSensitive: false isRequired: true module: table table-name: isSensitive: false isRequired: true defaultValue: latest-offset module: table #format define the input or sink format of the data, it always with module tableConfiguration Item Description:
The configuration file parameters become the WITH parameters in Flink DDL at runtime:
CREATE TABLE MyUserTable ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10, 3), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = 'localhost', 'port' = '1521', 'username' = 'Dataphin', 'password' = 'fli*****ssword', 'database-name' = 'ORCLCDB', 'schema-name' = 'dataphin', 'table-name' = 'myusertable');Description
Enter Custom Real-Time Data Source Test.
-
Click Confirm to complete the custom real-time data source type.
Step 2: Customize Ververica Flink Connector
If your Dataphin tenant uses the open-source Flink real-time computing engine, this step is not required. For more information, see the description of customizing Ververica Flink Connector.
-
Log on to the Real-Time Compute Console.
-
Click the Console under the Actions column of the target workspace.
-
In the left-side navigation pane, click Connector.
-
On the Connector page, click Create Custom Connector.
-
Upload the JAR file of the custom connector.
Upload the custom connector JAR file using one of the following methods:
-
Upload File: Click Select File and then select your target connector JAR file.
-
External URL: To use a JAR file hosted on another service, specify its external URL. For example,
https://ossbucket/artifacts/namespaces/flink-default/flink-jobs-1.0-SNAPSHOT.jarNoteOnly the following two types of external URLs are supported:
-
The OSS Bucket address selected when the Flink workspace is created. You can view the attached OSS Bucket in the details of the target workspace in the Real-Time Compute Management Console.
-
Other external storage system addresses that the Real-Time Compute Flink Edition can access and is allowed to access (public-read or granted permission).
-
-
-
After the upload is complete, click Next.
The system parses the uploaded JAR file. If parsing succeeds, proceed to the next step. If parsing fails, verify that your custom connector code complies with Apache Flink community standards.
-
Click Finish.
The custom connector appears in the connector list.
NoteThe value of the connector parameter in the WITH clause is the value of the identifier parameter of DynamicTableFactory in the JAR file of the custom connector. Other parameters in the WITH clause and their definitions vary based on the custom connector that you create.
Step 3: Create a custom real-time data source
After you create the real-time data source type, a custom data source appears in data source management. Configure its connection information to integrate it into Dataphin for metatable creation.
-
In the top menu bar on the Dataphin home page, select Management Center > Datasource Management.
-
On the data source page, click +create Datasource to open the Create Datasource dialog box.
-
In the Create Datasource dialog box, in the Custom Datasource area, select Oracle_cdc_test.
-
In the Create Oracle-cdc-test Datasource dialog box, configure the connection data source parameters.
The data source configuration items are determined by the YAML configuration file in Step 1. Different parameters in the file produce different options on the data source page.
Parameter
Description
Datasource Name
Enter Oracle-cdc-test.
Datasource Code
Enter Oracle_cdc_test.
Datasource Description
Enter Custom Real-Time Data Source Access Test.
Datasource Config
Select Production Datasource.
Tag
Default is Empty.
hostname
Enter the host address of the Oracle service.
port
Enter the port of the Oracle service.
username
Enter the username of the Oracle service.
password
Enter the password of the Oracle service.
-
Click Confirm to complete the creation of the custom data source Oracle-cdc-test.
NoteCustom connectors (real-time data sources) do not support connectivity checks.
Step 4: Create a metatable for the custom data source
After configuring the custom data source connection, create a metatable to support real-time development.
-
In the top menu bar on the Dataphin home page, select Development > Data Development.
-
In the top menu bar, select Project (Dev-Prod mode requires selecting Environment).
-
In the left-side navigation pane, select Data Processing > Tables.
-
In the right-side table management list, click
the new icon, select Real-time Compute Table, and open the Create Table dialog box. -
In the Create Table dialog box, configure the parameters.
The metatable configuration items are determined by the YAML configuration file in Step 1. Different parameters in the file produce different options in the metatable.
Parameter
Description
Table Type
Select Metatable.
Metatable Name
Enter Oracle_cdc_test.
Data Source
Select the created Oracle-cdc-test.
database-name
Enter Database Name.
schema-name
Enter Schema.
table-name
Enter Table Name.
Select Directory
Default is Real-time Compute Table.
Description
Enter Custom Real-Time Data Source Metatable Test.
-
Click Confirm to complete the creation of the metatable.
After the metatable is created, Dataphin automatically assembles the Flink SQL. The WITH clause parameter names come from the YAML configuration, their values from the data source and metatable pages, and the fields from the metatable structure. The generated code for Oracle_cdc_test is as follows:
create table oracle_cdc_test ( `id` INT comment '', `name` VARCHAR comment '', PRIMARY KEY(id) NOT ENFORCED ) with ( 'hostname'='47.***.***.217' ,'connector'='oracle-cdc' ,'port'='1511' ,'database-name'='dataphin' ,'schema-name'='dataphin' ,'table-name'='dataphin-tables' ,'username'='flink_demo' );