When built-in connectors do not meet your requirements, use a custom Flink connector.
Prerequisites
Enable Realtime Compute for Apache Flink and purchase an instance. For more information, see Enable Realtime Compute for Apache Flink.
Procedure
Select the FeatureStore connector version that matches your Flink version.
FeatureStore connector version
Flink version
1.1.5 (Download)
vvr-11.x-jdk11-flink-1.20
1.1.4 (Download)
vvr-8.0.11-flink-1.17
-
Register a custom connector.
-
Log in to the Realtime Compute for Apache Flink console.
-
In the Actions column of the target workspace, click Console.
-
In the left-side navigation pane, click Connectors.
-
On the Connectors page, click Create Custom Connector.
-
Select Upload File, click Select File, and upload the FeatureStore connector JAR package.
-
-
Click Next.
NoteThe system parses the content of your uploaded custom connector. If parsing succeeds, continue to the next step. If parsing fails, verify that your custom connector code complies with the Flink community standard.
-
Click Complete.
NoteYou can view the completed custom connector in the connector list.
-
(Optional) To use the connector as a dimension table, turn on the Lookup switch.
For details about properties, see Appendix: Property descriptions.
-
Click Complete.
Flink SQL demo
-
In FeatureStore, you can create a feature view.
Assume the following real-time features are defined in FeatureStore.
Basic information about the feature view: The type is Real-time, the write mode is Custom Table Structure, the feature entity is user, the JoinId is
user_id, Sync Feature Table is set totrue, and the feature time-to-live (TTL) is-1seconds. The feature fields includeuser_id(INT64, primary key),string_field(STRING),int32_field(INT32),float_field(FLOAT),double_field(DOUBLE), andboolean_field(BOOLEAN). -
Write data in Flink. For more information, see Job development overview.
Example:
-- Define the source table CREATE TEMPORARY TABLE server_logs ( user_id BIGINT, string_field STRING, int32_field INT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ); -- Define the sink table CREATE TEMPORARY TABLE featurestore_sink ( user_id BIGINT, string_field STRING, int32_field INT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'featurestore', 'username' = 'xxxx', 'password' = 'xxx', 'region_id' = 'cn-beijing', 'aliyun_access_id' = 'xxxx', 'aliyun_access_key' = 'xxxx', 'project' = 'tablestore_p2', 'feature_view' = 'user_fea3' ); -- Write data to the sink table INSERT INTO featurestore_sink SELECT user_id, string_field, int32_field, float_field, double_field,boolean_field FROM server_logsIf you use a dimension table, define a primary key for the FeatureStore table. Example:
-- Define the source table CREATE TEMPORARY TABLE server_logs ( user_id BIGINT, string_field STRING, int32_field INT, int64_field BIGINT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.user_id.min'='1', 'fields.user_id.max'='100', 'fields.user_id.kind'='random' ); -- Define the sink table CREATE TEMPORARY TABLE featurestore_sink ( user_id BIGINT, string_field STRING, int32_field INT, int64_field BIGINT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'featurestore', 'username' = 'xxx', 'password' = 'xxx', 'region_id' = 'cn-beijing', 'aliyun_access_id' = 'xxxx', 'aliyun_access_key' = 'xxxx', 'project' = 'fs_demo_featuredb', 'feature_view' = 'user_test_1', 'cacheSize' = '10000', 'cacheTime' = '1800' ); -- Write data to the sink table INSERT INTO featurestore_sink SELECT S.user_id, COALESCE(S.string_field, F.string_field), COALESCE(S.int32_field, F.int32_field), COALESCE(S.int64_field, F.int64_field), COALESCE(S.float_field, F.float_field), COALESCE(S.double_field,F.double_field), COALESCE(S.boolean_field, F.boolean_field) FROM server_logs S LEFT JOIN featurestore_sink FOR SYSTEM_TIME AS OF PROCTIME() AS F ON S.user_id = F.user_id
Appendix: Property descriptions
Name | Type | Required | Description |
region_id | string | Yes | The region ID. Examples:
|
aliyun_access_id | string | Yes | Your Alibaba Cloud AccessKey ID. |
aliyun_access_key | string | Yes | Your Alibaba Cloud AccessKey secret. |
username |
string |
Yes |
The username for the FeatureStoreDB data source. |
password | string | Yes | The password for the FeatureStoreDB data source. |
project | string | Yes | The project name in FeatureStore. |
feature_view | string | Yes | The feature view name in FeatureStore. |
host |
string |
No |
The public endpoint of FeatureStore, used for testing over the Internet. Examples:
|
use_public_address | boolean | No. Default: false | Use the public endpoint of FeatureStoreDB to write data. This is mainly for testing. |
insert_mode | string | No. Default: full_row_write | The method used to insert or update data. By default, the entire row is replaced. To update only specific fields, set this to partial_field_write. |
cacheSize | int | No | Used when the table is a dimension table. The cache size. |
cacheTime | int | No | Used when the table is a dimension table. The cache time, in seconds. |