通过Flink写入召回引擎(RecallEngine)
开通实时计算Flink并创建工作空间,通过Flink作业将数据实时写入召回引擎。建议开通实时计算Flink版。
前提条件
开通实时计算Flink并创建工作空间,建议开通实时计算Flink版。
已创建召回引擎实例,并获取到实例的 Endpoint、Instance ID、用户名和密码。
操作步骤
1. 根据Flink版本选择RecallEngine Connector相应的版本
RecallEngine Connector 版本与 Flink 版本对应关系如下:
RecallEngine Connector 版本 | Flink 版本 | 版本说明 |
1.0.8(下载) | vvr-11.x/flink-1.20 | 支持异步合并写入 |
1.0.7(下载) | vvr-11.x/flink-1.20 | |
1.0.7(下载) | vvr-8.x/flink-1.17 |
2. 注册资源到Flink
登录实时计算控制台。
在连接器页面,选择自定义连接器,单击创建自定义连接器。
上传上述下载的 RecallEngine Connector JAR 包。
点击下一步,设置连接器属性(可选),最后点击完成创建。
连接器 recallengine 的属性包括 endpoint、instance、username、password、table、connector(均为必填),以及 retry_time(默认值为 3)等,可按需修改后单击完成。
3. 作业编辑
在作业中使用 RecallEngine Connector,编写 Flink SQL 作业,将数据写入召回引擎。
Flink SQL Demo
1. 在召回引擎中创建目标表
需要在召回引擎中预先创建好目标表,定义好表的字段结构。
2. 在Flink中进行数据写入
以下示例展示如何将数据通过 Flink SQL 写入召回引擎。
定义数据源表:
CREATE TEMPORARY TABLE source_table (
user_id BIGINT,
item_id BIGINT,
score FLOAT,
tag STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);定义 RecallEngine Sink 表:
CREATE TEMPORARY TABLE recallengine_sink (
user_id BIGINT,
item_id BIGINT,
score FLOAT,
tag STRING
) WITH (
'connector' = 'recallengine',
'endpoint' = 'http://<your-endpoint>',
'instance_id' = '<your-instance-id>',
'table' = '<your-table-name>',
'username' = '<your-username>',
'password' = '<your-password>'
);执行写入:
INSERT INTO recallengine_sink
SELECT user_id, item_id, score, tag
FROM source_table;支持的数据类型
RecallEngine Flink Connector 支持以下 Flink SQL 数据类型:
Flink SQL 类型 | 说明 |
INT | 32位整数 |
BIGINT | 64位整数 |
FLOAT | 单精度浮点数 |
DOUBLE | 双精度浮点数 |
VARCHAR / STRING | 字符串 |
BOOLEAN | 布尔值 |
TIMESTAMP | 时间戳,写入时转换为字符串 |
TIMESTAMP_LTZ | 带时区的时间戳,写入时转换为字符串 |
DATE | 日期,写入时转换为 yyyy-MM-dd 格式字符串 |
ARRAY<INT> | 整数数组 |
ARRAY<BIGINT> | 长整数数组 |
ARRAY<FLOAT> | 单精度浮点数数组 |
ARRAY<DOUBLE> | 双精度浮点数数组 |
ARRAY<VARCHAR> | 字符串数组 |
附录:Properties 说明
名称 | 类型 | 是否必填 | 默认值 | 描述 |
connector | string | 是 | 无 | 固定值为 |
endpoint | string | 是 | 无 | 召回引擎服务的访问地址。可以在召回管理->基本信息里服务域名列表里找到。 |
instance_id | string | 是 | 无 | 召回引擎实例 ID。 |
table | string | 是 | 无 | 目标表名称。 |
username | string | 是 | 无 | 认证用户名。 |
password | string | 是 | 无 | 认证密码。 |
retry_times | int | 否 | 3 | 写入失败时的重试次数。 |
authorization | string | 否 | 无 | 可选的 Authorization 请求头值,用于自定义鉴权场景。 |
注意事项
RecallEngine Connector 仅支持 Sink(写入),不支持 Source(读取)和 Lookup(维表关联)。
Connector 支持处理 INSERT 和 UPDATE_AFTER 类型的数据变更,其他类型(如 DELETE、UPDATE_BEFORE)会被忽略。
写入时每条数据逐条发送,如有高吞吐需求,请合理配置 Flink 作业的并发度。