当内置的连接器无法满足需求时,您可以使用Flink自定义连接器。
前提条件
开通实时计算Flink并购买实例。具体操作,请参见开通实时计算Flink版。
操作步骤
-
根据Flink版本选择FeatureStore Connector相应的版本。
FeatureStroe Connector 版本
Flink 版本
1.1.5(下载)
vvr-11.x-jdk11-flink-1.20
1.1.4(下载)
vvr-8.0.11-flink-1.17
-
注册自定义连接器。
-
登录实时计算控制台。
-
单击目标工作空间操作列下的控制台。
-
在左侧导航栏,单击连接器。
-
在连接器页面,单击创建自定义连接器。
-
选择上传文件,单击选择文件,上传FeatureStore Connector JAR包。
-
-
单击下一步。
说明系统会对您上传的自定义连接器内容进行解析。如果解析成功,您可以继续下一步;如果解析失败,请确认您上传的自定义连接器代码是否符合Flink社区标准。
-
单击完成。
说明创建完成的自定义连接器可在连接器列表中查看。
-
(可选)上传成功后,如果希望其作为维表,请打开Lookup开关。
其中,Properties详情请参见附录:Properties说明。
-
单击完成。
Flink SQL Demo
-
在FeatureStore创建特征视图。
假设在FeatureStore定义了如下实时特征。
该特征视图的基本信息:类型为实时,写入方式为自定义表结构,特征实体为user,JoinId 为
user_id,同步特征表为true,特征生命周期(秒)为-1。特征字段包含:user_id(INT64,主键)、string_field(STRING)、int32_field(INT32)、float_field(FLOAT)、double_field(DOUBLE)、boolean_field(BOOLEAN)。 -
在Flink进行数据写入。具体操作,请参见作业开发地图。
示例如下:
-- 定义数据源表 CREATE TEMPORARY TABLE server_logs ( user_id BIGINT, string_field STRING, int32_field INT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ); -- 定义结果表 CREATE TEMPORARY TABLE featurestore_sink ( user_id BIGINT, string_field STRING, int32_field INT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'featurestore', 'username' = 'xxxx', 'password' = 'xxx', 'region_id' = 'cn-beijing', 'aliyun_access_id' = 'xxxx', 'aliyun_access_key' = 'xxxx', 'project' = 'tablestore_p2', 'feature_view' = 'user_fea3' ); -- 写入数据到结果表 INSERT INTO featurestore_sink SELECT user_id, string_field, int32_field, float_field, double_field,boolean_field FROM server_logs如果使用维表,使用FeatureStore的表,必须定义主键。示例如下:
-- 定义数据源表 CREATE TEMPORARY TABLE server_logs ( user_id BIGINT, string_field STRING, int32_field INT, int64_field BIGINT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.user_id.min'='1', 'fields.user_id.max'='100', 'fields.user_id.kind'='random' ); -- 定义结果表 CREATE TEMPORARY TABLE featurestore_sink ( user_id BIGINT, string_field STRING, int32_field INT, int64_field BIGINT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'featurestore', 'username' = 'xxx', 'password' = 'xxx', 'region_id' = 'cn-beijing', 'aliyun_access_id' = 'xxxx', 'aliyun_access_key' = 'xxxx', 'project' = 'fs_demo_featuredb', 'feature_view' = 'user_test_1', 'cacheSize' = '10000', 'cacheTime' = '1800' ); -- 写入数据到结果表 INSERT INTO featurestore_sink SELECT S.user_id, COALESCE(S.string_field, F.string_field), COALESCE(S.int32_field, F.int32_field), COALESCE(S.int64_field, F.int64_field), COALESCE(S.float_field, F.float_field), COALESCE(S.double_field,F.double_field), COALESCE(S.boolean_field, F.boolean_field) FROM server_logs S LEFT JOIN featurestore_sink FOR SYSTEM_TIME AS OF PROCTIME() AS F ON S.user_id = F.user_id
附录:Properties说明
|
名称 |
类型 |
是否必须 |
描述 |
|
region_id |
string |
是 |
地域标识。例如:
|
|
aliyun_access_id |
string |
是 |
阿里云Access Secret Id。 |
|
aliyun_access_key |
string |
是 |
阿里云Access Secret Key。 |
|
username |
string |
是 |
FeatureStoreDB数据源的用户名。 |
|
password |
string |
是 |
FeatureStoreDB数据源密码。 |
|
project |
string |
是 |
FeatureStore的项目名称。 |
|
feature_view |
string |
是 |
FeatureStore的特征视图名称。 |
|
host |
string |
否 |
如果通过公网测试,设置的FeatureStore的公网地址,例如:
|
|
use_public_address |
boolean |
否,默认为false |
使用FeatureStoreDB的公网地址进行数据写入,主要用于测试。 |
|
insert_mode |
string |
否,默认值为 full_row_write |
数据插入更新的方式,默认是整行的数据替换。如果是部分字段更新,指定 partial_field_write |
|
cacheSize |
int |
否 |
作为维表时使用。缓存大小。 |
|
cacheTime |
int |
否 |
作为维表时使用,缓存时间,单位秒。 |