本文通过一个商品评论情感分类的示例,介绍如何使用PyFlink Table API调用AI Function,完成从模型注册、数据准备到结果输出的完整流程。
前提条件
已开通实时计算Flink版并创建工作空间,详情请参见开通实时计算Flink版。实时计算引擎版本为VVR 11.6.0及以上。
已获取OpenAI兼容的LLM端点和API Key。以下端点均可使用:
阿里云百炼:
https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completionsOpenAI:
https://api.openai.com/v1/chat/completionsDeepSeek:
https://api.deepseek.com/v1/chat/completions其他兼容服务(vLLM、Ollama等)
(可选)本地开发环境配置:通过以下命令安装VVR PyFlink依赖包:
pip install ververica-flink说明ververica-flink是API-Only包,仅提供类型定义和接口声明,用于本地开发,作业仍需在实时计算Flink版平台提交。
支持的函数
AI Function是Flink Table API提供的一组内置函数,可在流批处理管道中直接调用大语言模型。更多函数支持请参见AI大语言模型集成。
本文以ai_classify为例演示完整流程。其他函数的调用模式相同,仅参数不同。
步骤一:创建Python脚本并初始化环境
创建一个Python文件(如ai_classify_example.py),导入所需模块并初始化Flink执行环境。
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes, Schema, ModelDescriptor
from pyflink.table.ai_functions import ai_classify
from pyflink.table.expressions import col
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)StreamTableEnvironment是Table API的入口,后续的模型注册、数据操作和作业执行均通过该对象完成。
步骤二:注册模型
使用ModelDescriptor定义LLM连接信息,并通过create_temporary_model注册到Table环境中。
model_descriptor = (
ModelDescriptor.for_provider("openai-compat")
.input_schema(
Schema.new_builder()
.column("input", DataTypes.STRING())
.build()
)
.output_schema(
Schema.new_builder()
.column("content", DataTypes.VARIANT())
.build()
)
.option("endpoint", "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions")
.option("api-key", "<YOUR_API_KEY>")
.option("model", "qwen3.6-plus")
.option("temperature", "0.0")
.build()
)
t_env.create_temporary_model("classify_model", model_descriptor, True)
model = t_env.from_model("classify_model")
参数说明
参数 | 说明 |
| 指定使用OpenAI兼容协议。适用于所有支持该协议的LLM服务。 |
| 模型输入schema。有且仅有一个 |
| 模型输出schema。垂类AI函数(如 |
| LLM服务的API地址。 |
| API访问密钥。 |
| 模型名称,如 |
| 生成温度。分类任务设为 |
create_temporary_model的第三个参数为True时,若同名模型已存在则覆盖。from_model返回模型引用,供后续AI Function调用。更多参数详情请参见模型设置。
持久化注册模型
create_temporary_model注册的模型仅在当前会话有效,作业结束后即销毁。如需将模型持久化到Catalog中供多个作业复用,使用create_model并指定完整的三段式名称:
t_env.create_model("my_catalog.my_db.classify_model", model_descriptor, True)引用已注册的持久化模型时,同样使用三段式名称:
model = t_env.from_model("my_catalog.my_db.classify_model")持久化模型的连接信息和schema定义存储在Catalog中,无需在每个作业中重复声明。
步骤三:准备输入数据
定义数据schema并构造输入表。
input_schema = DataTypes.ROW([
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("review", DataTypes.STRING()),
])
reviews = [
(1, "This product is absolutely amazing, I love it!"),
(2, "Terrible quality, broke after one day of use."),
(3, "It's okay, nothing special but gets the job done."),
(4, "Fast delivery, great packaging, very satisfied!"),
(5, "Would not recommend, customer service was unhelpful."),
]
input_table = t_env.from_elements(reviews, input_schema)
from_elements将Python列表转换为Flink表。实际生产中,输入数据通常来自Kafka、数据库等连接器。
步骤四:调用AI Function
Flink 提供丰富的内置AI Function。
以 AI_classify 为例,返回UDTF(用户自定义表函数)表达式,通过join_lateral展开并追加到原始表列之后。labels = ["positive", "negative", "neutral"]
classify_expr = ai_classify(model, col("review"), labels)
result_table = input_table.join_lateral(classify_expr)ai_classify接收三个参数:
参数 | 类型 | 说明 |
| Model | 步骤二中注册的模型引用 |
| Expression | 待分类的文本列 |
| list[str] | 候选标签列表 |
join_lateral执行后,结果表的列结构为:
列名 | 来源 | 类型 |
| 原始列 | BIGINT |
| 原始列 | STRING |
| AI Function输出 | STRING |
| AI Function输出 | DOUBLE |
步骤五:执行并输出结果
调用execute().collect()触发作业执行并收集结果。
results = list(result_table.execute().collect())
results.sort(key=lambda r: r[0])
for row in results:
review_id, review_text, category, confidence = row
print(f"id={review_id} category={category:<12s} confidence={confidence:.3f} review={review_text}")
每个Python作业仅支持一个execute()调用。若需执行多个计算任务,应串联为单个管道后统一执行。
步骤六:部署作业
部署并启动Python作业。部署流程请参见Flink Python作业。
在Flink作业的中添加以下配置:
pipeline.used-builtin-models: openai-compat在作业日志中查看输出。进入目标作业的Task Managers > 日志,搜索输出内容。
预期输出示例:
id=1 category=positive confidence=0.950 review=This product is absolutely amazing, I love it! id=2 category=negative confidence=0.980 review=Terrible quality, broke after one day of use. id=3 category=neutral confidence=0.850 review=It's okay, nothing special but gets the job done. id=4 category=positive confidence=0.960 review=Fast delivery, great packaging, very satisfied! id=5 category=negative confidence=0.920 review=Would not recommend, customer service was unhelpful.
完整代码
将<YOUR_API_KEY>替换为实际的API密钥。import logging
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes, Schema, ModelDescriptor
from pyflink.table.ai_functions import ai_classify
from pyflink.table.expressions import col
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
def main():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
model_descriptor = (
ModelDescriptor.for_provider("openai-compat")
.input_schema(
Schema.new_builder()
.column("input", DataTypes.STRING())
.build()
)
.output_schema(
Schema.new_builder()
.column("content", DataTypes.VARIANT())
.build()
)
.option("endpoint", "https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions")
.option("api-key", "<YOUR_API_KEY>")
.option("model", "qwen3.6-plus")
.option("temperature", "0.0")
.build()
)
t_env.create_temporary_model("classify_model", model_descriptor, True)
model = t_env.from_model("classify_model")
input_schema = DataTypes.ROW([
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("review", DataTypes.STRING()),
])
reviews = [
(1, "This product is absolutely amazing, I love it!"),
(2, "Terrible quality, broke after one day of use."),
(3, "It's okay, nothing special but gets the job done."),
(4, "Fast delivery, great packaging, very satisfied!"),
(5, "Would not recommend, customer service was unhelpful."),
]
input_table = t_env.from_elements(reviews, input_schema)
labels = ["positive", "negative", "neutral"]
classify_expr = ai_classify(model, col("review"), labels)
result_table = input_table.join_lateral(classify_expr)
results = list(result_table.execute().collect())
results.sort(key=lambda r: r[0])
for row in results:
review_id, review_text, category, confidence = row
logger.info("id=%d category=%-12s confidence=%.3f review=%s",
review_id, category, confidence, review_text)
if __name__ == "__main__":
main()