本文介绍如何在Flink数据摄入作业中使用内置的 AI 推理模型,实现对数据流的实时智能处理(如文本摘要生成、向量化等)。
前提条件
实时计算引擎:VVR 11.6 及更高版本
依赖包:将
flink-cdc-pipeline-modelJAR 包作为作业附加依赖上传。从 Maven Central 下载。
工作原理
AI 推理模型的集成分为两个步骤:
定义模型:在 YAML 作业草稿的
pipeline模块中定义模型列表调用模型:在
transform模块的projection或filter字段中通过函数调用方式使用模型
每个模型需要指定唯一名称,该名称将作为函数名在数据转换中调用。
配置并调用模型
步骤一:在 pipeline 模块中定义模型
在 YAML 作业草稿的 pipeline 模块中添加 model 列表。每个模型条目包含以下信息:
模型名称(
model-name)实现类(
class-name)模型提供商的连接参数
pipeline:
model:
- model-name: CHAT
class-name: OpenAIChatModel
openai.model: gpt-4o-mini
openai.host: https://xxxx
openai.apikey: abcd1234
openai.chat.prompt: please summary this
- model-name: GET_EMBEDDING
class-name: OpenAIEmbeddingModel
openai.model: text-embedding-3-small
openai.host: https://xxxx
openai.apikey: abcd1234通用参数(所有模型必填):
参数 | 类型 | 是否必填 | 说明 |
| STRING | 必填 | 模型的唯一标识符,可自定义,用于在 |
| STRING | 必填 | 模型实现类的名称。可选值: |
步骤二:在 transform 模块中调用模型
定义模型后,可以在 transform 模块中按名称调用。模型调用作为内置函数用于 projection(数据投影)和 filter(数据过滤)表达式。
完整配置示例
以下示例展示了如何定义两个模型,并在同一个 transform 模块中同时调用 Embedding 模型和 Chat 模型:
pipeline:
model:
- model-name: CHAT
class-name: OpenAIChatModel
openai.model: gpt-4o-mini
openai.host: https://xxxx
openai.apikey: abcd1234
openai.chat.prompt: please summarize this
- model-name: GET_EMBEDDING
class-name: OpenAIEmbeddingModel
openai.model: text-embedding-3-small
openai.host: https://xxxx
openai.apikey: abcd1234
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, GET_EMBEDDING(page) as emb"
filter: CHAT(page) = 'OK'调用说明:
GET_EMBEDDING(page):将page列的数据传入 Embedding 模型,返回ARRAY<FLOAT>类型的向量,存入emb列发送给下游。下游表 schema 中的emb列类型需声明为ARRAY<FLOAT>。CHAT(page):将page列的数据传入 Chat 模型,根据模型返回的STRING类型结果进行 filter 过滤。
支持的模型
模型类型与输出
模型类 | 适用场景 | 输出类型 |
| 文本摘要、翻译、问答等文本生成任务 |
|
| 文本向量化、构建向量库、语义相似度计算 |
|
OpenAIChatModel 参数
OpenAIChatModel 用于调用 OpenAI 的 Chat 模型(如 GPT 系列),适用于文本生成场景。
参数 | 类型 | 是否必填 | 说明 |
| STRING | 必填 | 要调用的模型名称。可选值: |
| STRING | 必填 | 模型服务地址。示例: |
| STRING | 必填 | API Key,用于模型服务验证。 |
| STRING | 可选 | 系统提示词 (system prompt),每次请求时附带。 |
OpenAIEmbeddingModel 参数
OpenAIEmbeddingModel 用于调用 OpenAI 的 Embedding 模型,将文本转换为向量。
参数 | 类型 | 是否必填 | 说明 |
| STRING | 必填 | 要调用的模型名称。可选值: |
| STRING | 必填 | 模型服务地址。示例: |
| STRING | 必填 | API Key,用于模型服务验证。 |