Example of Real-Time Development by Customizing Real-Time Data Sources

更新时间:
复制 MD 格式

Learn how to use custom real-time data sources for real-time development in Dataphin.

Procedure

Step 1: Create a real-time data source

  1. On the Dataphin home page, navigate to the top menu bar and select Management Center > Datasource Management.

  2. In the left-side navigation pane, select Datasource > Custom Source Type.

  3. On the Custom Source Type page, click Create Custom Source Type, select Create Real-time Custom Source Type, and open the Create Real-time Custom Source Type page.

  4. On the Create Real-time Custom Source Type page, configure the following parameters:

    Parameter

    Pattern

    Type Name

    Enter Oracle_cdc_test.

    Type Encoding

    Enter Oracle_cdc_test.

    JAR Package

    Upload the connector JAR package. This example uses the Community Edition of the Oracle CDC Connector. To download the JAR package, see Oracle | Apache Flink CDC.

    Configuration File

    Enter the following configuration file:

    Note

    If you use a different JAR package, modify the configuration accordingly.

    #delclare the use type of this connector
    kind:
      source
    # the name of this connector
    # if you use vvp flink,you also need to follow this match pattern [a-z0-9]([-a-z0-9\.]*[a-z0-9])?
    connector: oracle-cdc
    # for example, hostname is one property of this connector, you can see it in with clause
    # when isSensitive is true, usually used for password, it is mean that the value of your key will be encrypted in page
    # defaultValue is the default value of this key
    # when isRequired is true, you must set value to the key, and the key in page is marked begin with start *
    # when module is datasource it is mean that you can set the config in datasource page
    # when module is table it is mean that you can set the config in table page
    hostname:
      isSensitive: false
      isRequired: true
      module: datasource
    port:
      isSensitive: false
      isRequired: true
      module: datasource
    username:
      isSensitive: false
      isRequired: true
      module: datasource
    password:
      isSensitive: true
      isRequired: true
      module: datasource
    database-name:
      isSensitive: false
      isRequired: true
      module: table
    schema-name:
      isSensitive: false
      isRequired: true
      module: table
    table-name:
      isSensitive: false
      isRequired: true
      module: table
    table-name:
      isSensitive: false
      isRequired: true
      defaultValue: latest-offset
      module: table
    #format define the input or sink format of the data, it always with module table

    Configuration Item Description:

    The configuration file parameters become the WITH parameters in Flink DDL at runtime:

    CREATE TABLE MyUserTable (
      ID INT NOT NULL,
      NAME STRING,
      DESCRIPTION STRING,
      WEIGHT DECIMAL(10, 3),
      PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
      'connector' = 'oracle-cdc',
      'hostname' = 'localhost',
      'port' = '1521',
      'username' = 'Dataphin',
      'password' = 'fli*****ssword',
      'database-name' = 'ORCLCDB',
      'schema-name' = 'dataphin',
      'table-name' = 'myusertable');

    Description

    Enter Custom Real-Time Data Source Test.

  5. Click Confirm to complete the custom real-time data source type.

Step 2: Customize Ververica Flink Connector

If your Dataphin tenant uses the open-source Flink real-time computing engine, this step is not required. For more information, see the description of customizing Ververica Flink Connector.

  1. Log on to the Real-Time Compute Console.

  2. Click the Console under the Actions column of the target workspace.

  3. In the left-side navigation pane, click Connector.

  4. On the Connector page, click Create Custom Connector.

  5. Upload the JAR file of the custom connector.

    Upload the custom connector JAR file using one of the following methods:

    • Upload File: Click Select File and then select your target connector JAR file.

    • External URL: To use a JAR file hosted on another service, specify its external URL. For example, https://ossbucket/artifacts/namespaces/flink-default/flink-jobs-1.0-SNAPSHOT.jar

      Note

      Only the following two types of external URLs are supported:

      • The OSS Bucket address selected when the Flink workspace is created. You can view the attached OSS Bucket in the details of the target workspace in the Real-Time Compute Management Console.

      • Other external storage system addresses that the Real-Time Compute Flink Edition can access and is allowed to access (public-read or granted permission).

  6. After the upload is complete, click Next.

    The system parses the uploaded JAR file. If parsing succeeds, proceed to the next step. If parsing fails, verify that your custom connector code complies with Apache Flink community standards.

  7. Click Finish.

    The custom connector appears in the connector list.

    Note

    The value of the connector parameter in the WITH clause is the value of the identifier parameter of DynamicTableFactory in the JAR file of the custom connector. Other parameters in the WITH clause and their definitions vary based on the custom connector that you create.

Step 3: Create a custom real-time data source

After you create the real-time data source type, a custom data source appears in data source management. Configure its connection information to integrate it into Dataphin for metatable creation.

  1. In the top menu bar on the Dataphin home page, select Management Center > Datasource Management.

  2. On the data source page, click +create Datasource to open the Create Datasource dialog box.

  3. In the Create Datasource dialog box, in the Custom Datasource area, select Oracle_cdc_test.

  4. In the Create Oracle-cdc-test Datasource dialog box, configure the connection data source parameters.

    The data source configuration items are determined by the YAML configuration file in Step 1. Different parameters in the file produce different options on the data source page.

    Parameter

    Description

    Datasource Name

    Enter Oracle-cdc-test.

    Datasource Code

    Enter Oracle_cdc_test.

    Datasource Description

    Enter Custom Real-Time Data Source Access Test.

    Datasource Config

    Select Production Datasource.

    Tag

    Default is Empty.

    hostname

    Enter the host address of the Oracle service.

    port

    Enter the port of the Oracle service.

    username

    Enter the username of the Oracle service.

    password

    Enter the password of the Oracle service.

  5. Click Confirm to complete the creation of the custom data source Oracle-cdc-test.

    Note

    Custom connectors (real-time data sources) do not support connectivity checks.

Step 4: Create a metatable for the custom data source

After configuring the custom data source connection, create a metatable to support real-time development.

  1. In the top menu bar on the Dataphin home page, select Development > Data Development.

  2. In the top menu bar, select Project (Dev-Prod mode requires selecting Environment).

  3. In the left-side navigation pane, select Data Processing > Tables.

  4. In the right-side table management list, click image the new icon, select Real-time Compute Table, and open the Create Table dialog box.

  5. In the Create Table dialog box, configure the parameters.

    The metatable configuration items are determined by the YAML configuration file in Step 1. Different parameters in the file produce different options in the metatable.

    Parameter

    Description

    Table Type

    Select Metatable.

    Metatable Name

    Enter Oracle_cdc_test.

    Data Source

    Select the created Oracle-cdc-test.

    database-name

    Enter Database Name.

    schema-name

    Enter Schema.

    table-name

    Enter Table Name.

    Select Directory

    Default is Real-time Compute Table.

    Description

    Enter Custom Real-Time Data Source Metatable Test.

  6. Click Confirm to complete the creation of the metatable.

    After the metatable is created, Dataphin automatically assembles the Flink SQL. The WITH clause parameter names come from the YAML configuration, their values from the data source and metatable pages, and the fields from the metatable structure. The generated code for Oracle_cdc_test is as follows:

    create table oracle_cdc_test ( 
    	`id`	INT	comment '',
    	`name`	VARCHAR	comment '',
    	PRIMARY KEY(id) NOT ENFORCED 
    )
     with ( 
    'hostname'='47.***.***.217'
    ,'connector'='oracle-cdc'
    ,'port'='1511'
    ,'database-name'='dataphin'
    ,'schema-name'='dataphin'
    ,'table-name'='dataphin-tables'
    ,'username'='flink_demo'
    
    );