当内置的连接器无法满足需求时,您可以使用自定义连接器。本文为您介绍如何上传FeatureStore自定义连接器。
前提条件
开通实时计算Flink并购买实例。具体操作,请参见开通实时计算Flink版。
操作步骤
下载FeatureStore ConnectorJAR包。
进入自定义连接器注册入口。
登录实时计算控制台。
单击目标工作空间操作列下的控制台。
在左侧导航栏,单击连接器。
注册自定义连接器。
在连接器页面,单击创建自定义连接器。
选择上传文件,单击选择文件,上传FeatureStore Connector JAR包。
单击下一步。
说明系统会对您上传的自定义连接器内容进行解析。如果解析成功,您可以继续下一步;如果解析失败,请确认您上传的自定义连接器代码是否符合Flink社区标准。
单击完成。
说明创建完成的自定义连接器可在连接器列表中查看。
(可选)上传成功后,如果希望其作为维表,请打开Lookup开关。
其中,Properties详情请参见附录:Properties说明。
单击完成。
Flink SQL Demo
在FeatureStore创建特征视图。具体操作,请参见配置FeatureStore项目。
假设在FeatureStore定义了如下实时特征FeatureView。
在Flink进行数据写入。具体操作,请参见SQL作业开发。
示例如下:
-- 定义数据源表 CREATE TEMPORARY TABLE server_logs ( user_id BIGINT, string_field STRING, int32_field INT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ); -- 定义结果表 CREATE TEMPORARY TABLE featurestore_sink ( user_id BIGINT, string_field STRING, int32_field INT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'featurestore', 'username' = 'xxxx', 'password' = 'xxx', 'region_id' = 'cn-beijing', 'aliyun_access_id' = 'xxxx', 'aliyun_access_key' = 'xxxx', 'project' = 'tablestore_p2', 'feature_view' = 'user_fea3' ); -- 写入数据到结果表 INSERT INTO featurestore_sink SELECT user_id, string_field, int32_field, float_field, double_field,boolean_field FROM server_logs
如果使用维表,使用FeatureStore的表,必须定义主键。示例如下:
-- 定义数据源表 CREATE TEMPORARY TABLE server_logs ( user_id BIGINT, string_field STRING, int32_field INT, int64_field BIGINT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.user_id.min'='1', 'fields.user_id.max'='100', 'fields.user_id.kind'='random' ); -- 定义结果表 CREATE TEMPORARY TABLE featurestore_sink ( user_id BIGINT, string_field STRING, int32_field INT, int64_field BIGINT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'featurestore', 'username' = 'xxx', 'password' = 'xxx', 'region_id' = 'cn-beijing', 'aliyun_access_id' = 'xxxx', 'aliyun_access_key' = 'xxxx', 'project' = 'fs_demo_featuredb', 'feature_view' = 'user_test_1' ); -- 写入数据到结果表 INSERT INTO featurestore_sink SELECT S.user_id, COALESCE(S.string_field, F.string_field), COALESCE(S.int32_field, F.int32_field), COALESCE(S.int64_field, F.int64_field), COALESCE(S.float_field, F.float_field), COALESCE(S.double_field,F.double_field), COALESCE(S.boolean_field, F.boolean_field) FROM server_logs S LEFT JOIN featurestore_sink FOR SYSTEM_TIME AS OF PROCTIME() AS F ON S.user_id = F.user_id
附录:Propoerties说明
名称 | 类型 | 是否必须 | 描述 |
region_id | string | 是 | 地域标识。例如:
|
aliyun_access_id | string | 是 | 阿里云Access Secret Id。 |
aliyun_access_key | string | 是 | 阿里云Access Secret Key。 |
username | string | 是 | FeatureStoreDB数据源的用户名。 |
password | string | 是 | FeatureStoreDB数据源密码。 |
project | string | 是 | FeatureStore的项目名称。 |
feature_view | string | 是 | FeatureStore的特征视图名称。 |
host | string | 否 | 如果通过公网测试,设置的FeatureStore的公网地址,例如:
|
use_public_address | boolean | 否,默认为false | 使用FeatureStoreDB的公网地址进行数据写入,主要用于测试。 |
insert_mode | string | 否,默认值为 full_row_write | 数据插入更新的方式,默认是整行的数据替换。如果是部分字段更新,指定 partial_field_write |