文档

设置Flink Connector

更新时间:

当内置的连接器无法满足需求时,您可以使用自定义连接器。本文为您介绍如何上传FeatureStore自定义连接器。

前提条件

开通实时计算Flink并购买实例。具体操作,请参见开通实时计算Flink版

操作步骤

  1. 下载FeatureStore ConnectorJAR包。

  2. 进入自定义连接器注册入口。

    1. 登录实时计算控制台

    2. 单击目标工作空间操作列下的控制台

    3. 在左侧导航栏,单击数据连接

  3. 注册自定义连接器。

    1. 数据连接页面,单击创建自定义连接器

    2. 选择上传文件,单击选择文件,上传FeatureStore Connector JAR包。

  4. 单击下一步

    说明

    系统会对您上传的自定义连接器内容进行解析。如果解析成功,您可以继续下一步;如果解析失败,请确认您上传的自定义连接器代码是否符合Flink社区标准。

  5. 单击完成

    说明

    创建完成的自定义连接器可在连接器列表中查看。

  6. (可选)上传成功后,如果希望其作为维表,请打开Lookup开关。

    其中,Properties详情请参见附录:Properties说明

    image

  7. 单击完成

Flink SQL Demo

  1. 在FeatureStore创建特征视图。具体操作,请参见配置FeatureStore项目

    假设在FeatureStore定义了如下实时特征FeatureView。

    image

  2. 在Flink进行数据写入。具体操作,请参见SQL作业开发

    示例如下:

    -- 定义数据源表
    CREATE TEMPORARY TABLE server_logs ( 
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN
    ) WITH (
      'connector' = 'datagen',   
      'rows-per-second' = '10'
    
    );
    -- 定义结果表
    CREATE TEMPORARY TABLE featurestore_sink (
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN
    ) WITH (
      'connector' = 'featurestore',
      'username' = 'xxxx',
      'password' = 'xxx',
      'region_id' = 'cn-beijing',
      'aliyun_access_id' = 'xxxx',
      'aliyun_access_key' = 'xxxx',
      'project' = 'tablestore_p2',
      'feature_view' = 'user_fea3'
    );
         
    -- 写入数据到结果表
    INSERT INTO featurestore_sink
    	SELECT user_id, string_field, int32_field, float_field, double_field,boolean_field
    FROM server_logs
    

    如果使用维表,使用FeatureStore的表,必须定义主键。示例如下:

    -- 定义数据源表
    CREATE TEMPORARY TABLE server_logs ( 
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      int64_field BIGINT,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN
    ) WITH (
      'connector' = 'datagen',   
      'rows-per-second' = '10',
      'fields.user_id.min'='1',
      'fields.user_id.max'='100',
      'fields.user_id.kind'='random'
    
    );
    -- 定义结果表
    CREATE TEMPORARY TABLE featurestore_sink (
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      int64_field BIGINT,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'featurestore',
      'username' = 'xxx',
      'password' = 'xxx',
      'region_id' = 'cn-beijing',
      'aliyun_access_id' = 'xxxx',
      'aliyun_access_key' = 'xxxx',
      'project' = 'fs_demo_featuredb',
      'feature_view' = 'user_test_1'
    );
         
    -- 写入数据到结果表
    INSERT INTO featurestore_sink
    	SELECT S.user_id, 
        COALESCE(S.string_field, F.string_field), 
        COALESCE(S.int32_field, F.int32_field),  
        COALESCE(S.int64_field, F.int64_field),
        COALESCE(S.float_field, F.float_field),
        COALESCE(S.double_field,F.double_field),
        COALESCE(S.boolean_field, F.boolean_field) 
    FROM server_logs S LEFT JOIN featurestore_sink FOR SYSTEM_TIME AS OF PROCTIME() AS F ON  S.user_id = F.user_id

附录:Propoerties说明

名称

类型

是否必须

描述

region_id

string

地域标识。例如:

  • 北京:cn-beijing

  • 上海:cn-shanghai

  • 杭州:cn-hangzhou

  • 深圳:cn-shenzhen

  • 新加坡:ap-southeast-1

aliyun_access_id

string

阿里云Access Secret Id。

aliyun_access_key

string

阿里云Access Secret Key。

username

string

FeatureStoreDB数据源的用户名。

password

string

FeatureStoreDB数据源密码。

project

string

FeatureStore的项目名称。

feature_view

string

FeatureStore的特征视图名称。

host

string

如果通过公网测试,设置的FeatureStore的公网地址,例如:

  • 北京:paifeaturestore.cn-beijing.aliyuncs.com

  • 杭州:paifeaturestore.cn-hangzhou.aliyuncs.com

  • 上海:paifeaturestore.cn-shanghai.aliyuncs.com

  • 深圳:paifeaturestore.cn-shenzhen.aliyuncs.com

  • 新加坡:paifeaturestore.ap-southeast-1.aliyuncs.com

use_public_address

boolean

否,默认为false

使用FeatureStoreDB的公网地址进行数据写入,主要用于测试。