Set up a Flink connector

更新时间:
复制 MD 格式

When built-in connectors do not meet your requirements, use a custom Flink connector.

Prerequisites

Enable Realtime Compute for Apache Flink and purchase an instance. For more information, see Enable Realtime Compute for Apache Flink.

Procedure

  1. Select the FeatureStore connector version that matches your Flink version.

    FeatureStore connector version

    Flink version

    1.1.5 (Download)

    vvr-11.x-jdk11-flink-1.20

    1.1.4 (Download)

    vvr-8.0.11-flink-1.17

  2. Register a custom connector.

    1. Log in to the Realtime Compute for Apache Flink console.

    2. In the Actions column of the target workspace, click Console.

    3. In the left-side navigation pane, click Connectors.

    4. On the Connectors page, click Create Custom Connector.

    5. Select Upload File, click Select File, and upload the FeatureStore connector JAR package.

  3. Click Next.

    Note

    The system parses the content of your uploaded custom connector. If parsing succeeds, continue to the next step. If parsing fails, verify that your custom connector code complies with the Flink community standard.

  4. Click Complete.

    Note

    You can view the completed custom connector in the connector list.

  5. (Optional) To use the connector as a dimension table, turn on the Lookup switch.

    For details about properties, see Appendix: Property descriptions.

  6. Click Complete.

Flink SQL demo

  1. In FeatureStore, you can create a feature view.

    Assume the following real-time features are defined in FeatureStore.

    Basic information about the feature view: The type is Real-time, the write mode is Custom Table Structure, the feature entity is user, the JoinId is user_id, Sync Feature Table is set to true, and the feature time-to-live (TTL) is -1 seconds. The feature fields include user_id (INT64, primary key), string_field (STRING), int32_field (INT32), float_field (FLOAT), double_field (DOUBLE), and boolean_field (BOOLEAN).

  2. Write data in Flink. For more information, see Job development overview.

    Example:

    -- Define the source table
    CREATE TEMPORARY TABLE server_logs ( 
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN
    ) WITH (
      'connector' = 'datagen',   
      'rows-per-second' = '10'
    );
    -- Define the sink table
    CREATE TEMPORARY TABLE featurestore_sink (
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN
    ) WITH (
      'connector' = 'featurestore',
      'username' = 'xxxx',
      'password' = 'xxx',
      'region_id' = 'cn-beijing',
      'aliyun_access_id' = 'xxxx',
      'aliyun_access_key' = 'xxxx',
      'project' = 'tablestore_p2',
      'feature_view' = 'user_fea3'
    );
    -- Write data to the sink table
    INSERT INTO featurestore_sink
    	SELECT user_id, string_field, int32_field, float_field, double_field,boolean_field
    FROM server_logs
    

    If you use a dimension table, define a primary key for the FeatureStore table. Example:

    -- Define the source table
    CREATE TEMPORARY TABLE server_logs ( 
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      int64_field BIGINT,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN
    ) WITH (
      'connector' = 'datagen',   
      'rows-per-second' = '10',
      'fields.user_id.min'='1',
      'fields.user_id.max'='100',
      'fields.user_id.kind'='random'
    );
    -- Define the sink table
    CREATE TEMPORARY TABLE featurestore_sink (
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      int64_field BIGINT,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'featurestore',
      'username' = 'xxx',
      'password' = 'xxx',
      'region_id' = 'cn-beijing',
      'aliyun_access_id' = 'xxxx',
      'aliyun_access_key' = 'xxxx',
      'project' = 'fs_demo_featuredb',
      'feature_view' = 'user_test_1',
      'cacheSize' = '10000',
      'cacheTime' = '1800'
    );
    -- Write data to the sink table
    INSERT INTO featurestore_sink
    	SELECT S.user_id, 
        COALESCE(S.string_field, F.string_field), 
        COALESCE(S.int32_field, F.int32_field),  
        COALESCE(S.int64_field, F.int64_field),
        COALESCE(S.float_field, F.float_field),
        COALESCE(S.double_field,F.double_field),
        COALESCE(S.boolean_field, F.boolean_field) 
    FROM server_logs S LEFT JOIN featurestore_sink FOR SYSTEM_TIME AS OF PROCTIME() AS F ON  S.user_id = F.user_id

Appendix: Property descriptions

Name

Type

Required

Description

region_id

string

Yes

The region ID. Examples:

  • Beijing: cn-beijing

  • Shanghai: cn-shanghai

  • Hangzhou: cn-hangzhou

  • Shenzhen: cn-shenzhen

  • Singapore: ap-southeast-1

aliyun_access_id

string

Yes

Your Alibaba Cloud AccessKey ID.

aliyun_access_key

string

Yes

Your Alibaba Cloud AccessKey secret.

username

string

Yes

The username for the FeatureStoreDB data source.

password

string

Yes

The password for the FeatureStoreDB data source.

project

string

Yes

The project name in FeatureStore.

feature_view

string

Yes

The feature view name in FeatureStore.

host

string

No

The public endpoint of FeatureStore, used for testing over the Internet. Examples:

  • Beijing: paifeaturestore.cn-beijing.aliyuncs.com

  • Hangzhou: paifeaturestore.cn-hangzhou.aliyuncs.com

  • Shanghai: paifeaturestore.cn-shanghai.aliyuncs.com

  • Shenzhen: paifeaturestore.cn-shenzhen.aliyuncs.com

  • Singapore: paifeaturestore.ap-southeast-1.aliyuncs.com

use_public_address

boolean

No. Default: false

Use the public endpoint of FeatureStoreDB to write data. This is mainly for testing.

insert_mode

string

No. Default: full_row_write

The method used to insert or update data. By default, the entire row is replaced. To update only specific fields, set this to partial_field_write.

cacheSize

int

No

Used when the table is a dimension table. The cache size.

cacheTime

int

No

Used when the table is a dimension table. The cache time, in seconds.