设置Flink Connector

更新时间:
复制为 MD 格式

通过Flink写入召回引擎(RecallEngine)

开通实时计算Flink并创建工作空间,通过Flink作业将数据实时写入召回引擎。建议开通实时计算Flink版。

前提条件

  • 开通实时计算Flink并创建工作空间,建议开通实时计算Flink

  • 已创建召回引擎实例,并获取到实例的 Endpoint、Instance ID、用户名和密码。

操作步骤

1. 根据Flink版本选择RecallEngine Connector相应的版本

RecallEngine Connector 版本与 Flink 版本对应关系如下:

RecallEngine Connector 版本

Flink 版本

版本说明

1.0.8(下载

vvr-11.x/flink-1.20

支持异步合并写入

1.0.7(下载

vvr-11.x/flink-1.20

1.0.7(下载

vvr-8.x/flink-1.17

2. 注册资源到Flink

  1. 登录实时计算控制台

  2. 连接器页面,选择自定义连接器,单击创建自定义连接器

  3. 上传上述下载的 RecallEngine Connector JAR 包。

  4. 点击下一步,设置连接器属性(可选),最后点击完成创建。

连接器 recallengine 的属性包括 endpointinstanceusernamepasswordtableconnector(均为必填),以及 retry_time(默认值为 3)等,可按需修改后单击完成

3. 作业编辑

在作业中使用 RecallEngine Connector,编写 Flink SQL 作业,将数据写入召回引擎。

Flink SQL Demo

1. 在召回引擎中创建目标表

需要在召回引擎中预先创建好目标表,定义好表的字段结构。

2. 在Flink中进行数据写入

以下示例展示如何将数据通过 Flink SQL 写入召回引擎。

定义数据源表:

CREATE TEMPORARY TABLE source_table (
    user_id BIGINT,
    item_id BIGINT,
    score FLOAT,
    tag STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '10'
);

定义 RecallEngine Sink 表:

CREATE TEMPORARY TABLE recallengine_sink (
    user_id BIGINT,
    item_id BIGINT,
    score FLOAT,
    tag STRING
) WITH (
    'connector' = 'recallengine',
    'endpoint' = 'http://<your-endpoint>',
    'instance_id' = '<your-instance-id>',
    'table' = '<your-table-name>',
    'username' = '<your-username>',
    'password' = '<your-password>'
);

执行写入:

INSERT INTO recallengine_sink
SELECT user_id, item_id, score, tag
FROM source_table;

支持的数据类型

RecallEngine Flink Connector 支持以下 Flink SQL 数据类型:

Flink SQL 类型

说明

INT

32位整数

BIGINT

64位整数

FLOAT

单精度浮点数

DOUBLE

双精度浮点数

VARCHAR / STRING

字符串

BOOLEAN

布尔值

TIMESTAMP

时间戳,写入时转换为字符串

TIMESTAMP_LTZ

带时区的时间戳,写入时转换为字符串

DATE

日期,写入时转换为 yyyy-MM-dd 格式字符串

ARRAY<INT>

整数数组

ARRAY<BIGINT>

长整数数组

ARRAY<FLOAT>

单精度浮点数数组

ARRAY<DOUBLE>

双精度浮点数数组

ARRAY<VARCHAR>

字符串数组

附录:Properties 说明

名称

类型

是否必填

默认值

描述

connector

string

固定值为 recallengine

endpoint

string

召回引擎服务的访问地址。可以在召回管理->基本信息里服务域名列表里找到。

instance_id

string

召回引擎实例 ID。

table

string

目标表名称。

username

string

认证用户名。

password

string

认证密码。

retry_times

int

3

写入失败时的重试次数。

authorization

string

可选的 Authorization 请求头值,用于自定义鉴权场景。

注意事项

  • RecallEngine Connector 仅支持 Sink(写入),不支持 Source(读取)和 Lookup(维表关联)。

  • Connector 支持处理 INSERTUPDATE_AFTER 类型的数据变更,其他类型(如 DELETE、UPDATE_BEFORE)会被忽略。

  • 写入时每条数据逐条发送,如有高吞吐需求,请合理配置 Flink 作业的并发度。