向量化结果缓存

更新时间:
复制 MD 格式

本文介绍如何为 AI_EMBED 配置外部缓存表,复用相同输入文本的向量化结果,从而减少模型调用、降低延迟与成本。

背景信息

AI_EMBED 通过 CACHE_TABLE 参数指定一张外部表作为向量化结果的持久化缓存。运行时按以下逻辑工作:

  • 命中缓存:异步 lookup 命中后直接返回结果,跳过模型调用。

  • 未命中缓存:异步 lookup 未命中时调用模型计算向量,返回结果,并异步回写缓存表。

对相同输入文本的重复请求,该机制可避免重复调用模型服务。

使用限制

创建缓存表

缓存表的主键类型需与 CACHE_KEY 类型一致。

显式指定 Key 模式

主键类型与 CACHE_KEY 指定列的类型保持兼容。以源表中 cache_key 是 INT 类型为例:

CREATE TABLE `fluss`.`default`.`ai_cache` (
  cache_key   INT NOT NULL,
  embedding   ARRAY<FLOAT>,PRIMARY KEY (cache_key) NOT ENFORCED
);

自动生成 Key 模式

主键固定为 STRING 类型,用于存储 SHA256 哈希值:

CREATE TABLE `fluss`.`default`.`ai_cache_sha` (
  cache_key   STRING NOT NULL,
  embedding   ARRAY<FLOAT>,PRIMARY KEY (cache_key) NOT ENFORCED
);

语法

AI_EMBED(
  MODEL       => MODEL <MODEL NAME>,
  INPUT       => <INPUT COLUMN NAME>
  [, DIMENSION   => <DIMENSION VALUE>]
  [, CACHE_TABLE => TABLE <catalog>.<db>.<table>
     [, CACHE_KEY => DESCRIPTOR(<column>)]]
  [, CONFIG      => MAP[...]]
)

参数

参数

是否必填

说明

MODEL

模型引用,详情参见注册模型

INPUT

输入文本列。

CACHE_TABLE

缓存表引用。语法为 TABLE <catalog>.<db>.<table>

CACHE_KEY

缓存 Key 列。语法为 DESCRIPTOR(<column>)。未指定时,系统自动生成 SHA256 合成 Key。

CONFIG

运行时配置。

CACHE_KEY 模式说明

(推荐)显式指定:直接以输入表的某列值作为 lookup Key。在数据本身有唯一 key 标识的情况下推荐显式指定。

CACHE_KEY => DESCRIPTOR(cache_key)

自动生成:不传 CACHE_KEY 时,系统按以下规则生成 Key:

-- 不传 CACHE_KEY 时,系统自动生成:
-- key = LOWER(SHA256(model_id || ':' || dimension || ':' || input_text))
CACHE_TABLE => TABLE `fluss-ql`.`default`.`ai_cache1`

该模式存在运行时计算 SHA256 的开销,但可自动保证「模型 + 维度 + 输入文本」对应唯一 Key。

性能调优

调优参数

SET 'table.exec.async-ml-predict.max-concurrent-operations' = '100';
SET 'table.exec.async-ml-predict.output-mode' = 'ALLOW_UNORDERED';
SET 'table.exec.async-ml-predict.timeout' = '120s';

参数说明

参数

默认值

说明

max-concurrent-operations

10

并行查缓存或调用模型的最大记录数。调大可提升吞吐;过大则可能压垮模型服务或导致内存占用过高。

output-mode

ORDERED

输出顺序模式。

  • ORDERED:严格按输入顺序输出,适用于下游依赖顺序的 Join、Window 场景。

  • ALLOW_UNORDERED:先完成先输出,推荐 INSERT-only 流使用。

timeout

3min

lookup 与 predict 总耗时的上限。设置过小会导致正常 miss 被超时中止。

(推荐)ALLOW_UNORDERED 收益较大,原因如下:

  • Cache Hit 通常一次 Lookup 即可完成;Cache Miss 需额外调用一次模型。

  • ORDERED 模式下,一个 Miss 会阻塞后续所有 Hit 的输出。

  • ALLOW_UNORDERED 模式下,Hit 立即输出,无需等待前面的 Miss 完成。

说明

仅 INSERT-only 流会实际启用 ALLOW_UNORDERED。带 Retract 的流即使设置该值也会回退到 ORDERED

监控指标

作业运行时,可通过 Flink Metrics 观察缓存效果。

指标名

含义

关注点

ai_function_cache.cache_hit

缓存命中次数

命中率 = hit / (hit + miss)。

ai_function_cache.cache_miss

缓存未命中次数

持续偏高说明缓存未预热或 Key 不稳定。

ai_function_cache.writeback_attempt

回写尝试次数

数值应接近 miss 数。

ai_function_cache.writeback_skipped_null_output

因模型返回 null 而跳过的回写次数

数值偏高说明模型异常。

完整示例

Fluss主键表示例:

-- 提前建好 缓存表 (Fluss)
CREATE TABLE `fluss`.`default`.`embedding_cache` (
  cache_key   STRING NOT NULL,
  embedding   ARRAY<FLOAT>,
  PRIMARY KEY (cache_key) NOT ENFORCED
);

使用 Fluss 主键表作为缓存,以 ALLOW_UNORDERED 模式提升吞吐。

-- 调优参数
SET 'table.exec.async-ml-predict.max-concurrent-operations' = '100';
SET 'table.exec.async-ml-predict.output-mode' = 'ALLOW_UNORDERED';

-- 创建模型
CREATE TEMPORARY MODEL embedding_model
  INPUT (text STRING)
  OUTPUT (embedding ARRAY<FLOAT>)
WITH (
  'provider' = 'openai-compat',
  'task' = 'embeddings',
  'model' = 'text-embedding-v4',
  'dimension' = '1024'
);

-- 业务查询
INSERT INTO result_sink
SELECT id, content, embedding
FROM source_table, LATERAL TABLE(AI_EMBED(
  MODEL       => MODEL embedding_model,
  INPUT       => content,
  CONFIG      => MAP['async', 'true'],
  CACHE_TABLE => TABLE `fluss`.`default`.`embedding_cache`
));