Python DataFrame API 为您提供 DataFrame 风格的 Python API 接口,用于编写 Flink 作业。其中包括筛选、投影、连接、聚合等基本 API,让您可以用传统关系代数的方式组织数据处理逻辑;同时,DataFrame API 支持使用 Python 编写用户自定义函数,实现标准算子难以覆盖的特定业务逻辑和数据操作;此外,DataFrame API 提供 AI/LLM 功能,让您在数据处理流程中直接接入大模型服务,并以开箱即用的方式调用分类、情感分析、抽取、翻译、摘要、向量生成等专用 AI 函数。本文为您介绍支持的 API,供您开发作业参考。完整的 API 文档请参见 PyFlink DataFrame。
DataFrame 基本操作
以下表格汇总了 DataFrame 上的核心操作。
API 类型 | API 详情 |
构造 / 创建 | |
属性 | |
投影 / 列操作 | |
过滤 | |
聚合 | |
连接 / 合并 | |
行映射 | |
限制 / 分页 | |
空值处理 | |
输出 / 收集 | |
调试 / 执行计划 | |
SQL |
表达式辅助函数
函数 | 说明 |
创建列引用表达式 | |
创建字面量表达式 |
数据类型
DataType 用于描述 DataFrame 列的类型。
类型分类 | 方法 |
布尔 | |
整数 | |
浮点 | |
字符串 | |
二进制 | |
日期 / 时间 | DataType.date、DataType.time、DataType.timestamp、DataType.timestamp_ltz |
复合类型 | |
特殊类型 | |
可空性修饰 |
DataType 可用于为数据源指定 schema、指定 UDF 输出类型等场景。例如,使用 DataType 为 Kafka 数据源指定 schema:
import pyflink.dataframe as pf
from pyflink.dataframe import DataType
df = pf.read_kafka(
"localhost:9092",
topic="user_events",
schema={
"user_id": DataType.string(),
"event_type": DataType.string(),
"amount": DataType.decimal(10, 2),
"tags": DataType.list(DataType.string()),
"event_time": DataType.timestamp_ltz(3),
},
format="json",
startup_mode="earliest-offset",
)I/O 读写
DataFrame 提供了丰富的数据源读写接口,您可以方便地读写常见的数据源,也可以使用 read_custom 或 write_custom 函数调用其他已支持的连接器或自定义连接器读写数据。
数据读入
数据源 | API |
Parquet 文件 | |
Kafka | |
MaxCompute (ODPS) | |
Paimon | |
SLS (日志服务) | |
Hologres | |
自定义连接器 |
数据输出
数据目标 | API |
Parquet 文件 | |
Kafka | |
MaxCompute (ODPS) | |
Paimon | |
SLS (日志服务) | |
Hologres | |
自定义连接器 |
用户自定义函数
udf 装饰器支持将 Python 函数注册为 DataFrame 可用的标量函数。按执行方式可分为四类:
同步逐行:普通 Python 函数,每次处理一行。
异步逐行:适合调用外部服务或执行 I/O 密集型逻辑,允许多行同时执行 I/O 操作,从而提升吞吐。
同步向量:按批接收和返回数据,减少 Python 与 Flink 运行时之间的逐行调用开销,适合纯计算逻辑。
异步向量:兼顾批处理吞吐与异步 I/O 并发。
udf
接口名称:udf。
udf( func=None, *, return_dtype=None, deterministic=True, name=None, func_type=None, concurrency=None, batch_size=None )功能描述:将 Python 函数注册为 DataFrame 可用的标量用户自定义函数。支持普通函数、async 函数、ScalarFunction / AsyncScalarFunction 子类及可调用类。
入参
参数名称
参数类型
是否必填
描述
func
Callable / Class
否
要包装的 Python 函数、ScalarFunction 实例/子类或可调用类。若省略则返回装饰器。
return_dtype
DataType / str / type
否
返回类型。可以是
DataType实例(如DataType.int64())、Python 类型(如int)或 SQL 类型字符串(如'BIGINT')。若省略则从函数返回类型提示自动推断。deterministic
Boolean
否
函数是否是确定性的。默认值为
True。name
String
否
UDF 的名称。若不指定则使用函数名。
func_type
String
否
执行格式,可选
"general"、"pandas"、"arrow"。若不指定则根据参数类型提示自动检测。concurrency
int
否
UDF 算子的并行度。不同 concurrency 值的 UDF 会被拆分到独立算子。
batch_size
int
否
每批最大元素数,仅适用于批处理 UDF(pandas / arrow 模式)。
返回值
DataFrameUDFWrapper 对象,可通过
udf_func(col("a"), col("b"))的方式在with_column、with_columns、map、map_batches等操作中使用。示例:请参见用户自定义函数。
AI / LLM 函数
通过 df.llm 访问器,DataFrame 提供了一系列内置 AI 函数。详细 API 参考见 AI/LLM。
Provider 配置
使用 AI 函数前,需先注册模型服务商(Provider)。
API | 说明 |
注册一个模型 Provider | |
设置默认 Provider(多 Provider 场景下使用) | |
列举已注册的 Provider |
支持的 Provider 类型:
Provider | 适用场景 |
OpenAI、DeepSeek、灵积(百炼)等所有 OpenAI 兼容接口 | |
阿里云灵积(DashScope),支持多模态 Embedding | |
NVIDIA Triton Inference Server | |
通用 Provider,通过键值对配置任意后端 |
set_provider
接口名称:set_provider。
set_provider( name_or_provider, provider=None, **options )功能描述:注册一个全局模型 Provider 配置。支持三种调用方式:传入 Provider 实例、传入名称 + Provider 实例、传入名称 + 关键字参数。
入参
参数名称
参数类型
是否必填
描述
name_or_provider
Provider / String
是
Provider 实例(自动以其默认名称注册)或自定义名称字符串。
provider
Provider
否
Provider 实例,仅在第一个参数为名称字符串时使用。
**options
key=value
否
Provider 配置项(如
endpoint、api_key),仅在第一个参数为名称字符串且未传入 provider 时使用,将创建 GenericProvider。返回值
无。
示例
import pyflink.dataframe as pf # Option 1: pass a Provider instance directly pf.set_provider(pf.OpenAICompatProvider( endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions", api_key="sk-..." )) # Option 2: custom name + Provider instance (register multiple) pf.set_provider("chat", pf.OpenAICompatProvider( endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions", api_key="sk-..." )) pf.set_provider("embedding", pf.OpenAICompatProvider( endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings", api_key="sk-..." )) # Option 3: name + keyword arguments (creates GenericProvider) pf.set_provider("openai-compat", endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions", api_key="sk-..." )
通用调用
接口名称:predict。
DataFrame.llm.predict( *input_cols, provider=None, model=None, output_type=None, config=None )功能描述:通用模型推理。将指定输入列发送给模型,返回模型输出列追加到 DataFrame 中。支持自定义输出 schema。
入参
参数名称
参数类型
是否必填
描述
*input_cols
String
是
作为模型输入的列名,可传入多个。
provider
String
否
Provider 名称。若不指定则使用默认 Provider;若无 Provider 配置,则
model被视为 Catalog Model 名称。model
String
否
模型名称(如
"qwen-plus")或 Catalog Model 名称。output_type
Dict
否
输出列 schema,格式为
{列名: 类型},类型可以是 SQL 类型字符串或 DataType 对象。默认值为{"output": "STRING"}。config
Dict
否
运行时配置选项。
返回值
DataFrame 对象,包含原始列和模型输出列。
示例
import pyflink.dataframe as pf pf.set_provider(pf.OpenAICompatProvider( endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions", api_key="sk-..." )) df = pf.from_dict({"question": ["What is Flink?", "What is stream processing?"]}) # default output column: output (STRING) df = df.llm.predict("question", provider="chat", model="qwen-plus") # custom output schema df = df.llm.predict("question", provider="chat", model="qwen-plus", output_type={"answer": "STRING", "score": "DOUBLE"})
文本类
文本分类
接口名称:ai_classify。
DataFrame.llm.ai_classify( input_col, labels, *, provider=None, model=None, config=None )功能描述:将文本分类到指定标签列表中的某一个。
入参
参数名称
参数类型
是否必填
描述
input_col
String / Expression
是
输入文本列名或列表达式。
labels
List[String]
是
分类标签列表,如
["positive", "negative", "neutral"]。provider
String
否
Provider 名称。
model
String
否
模型名称。
config
Dict
否
运行时配置选项。
返回值
DataFrame 对象,追加
category(STRING) 和confidence(DOUBLE) 列,包含分类结果和置信分数。示例
df = pf.from_dict({"review": ["Great product", "Terrible, do not buy", "It is okay"]}) df = df.llm.ai_classify("review", labels=["positive", "negative", "neutral"], provider="chat", model="qwen-plus")
情感分析
接口名称:ai_sentiment。
DataFrame.llm.ai_sentiment( input_col, *, provider=None, model=None, config=None )功能描述:对输入文本进行情感分析。
入参
参数名称
参数类型
是否必填
描述
input_col
String / Expression
是
输入文本列名或列表达式。
provider
String
否
Provider 名称。
model
String
否
模型名称。
config
Dict
否
运行时配置选项。
返回值
DataFrame 对象,追加以下列,包含情感分析结果:
score(DOUBLE): 情感分析分数,在 -1.0 到 1.0 之间。label(STRING): “positive”, “negative”, “neutral” 之一。confidence(DOUBLE): 置信分数。
示例
df = pf.from_dict({"comment": ["This feature is amazing!", "Broke after one month"]}) df = df.llm.ai_sentiment("comment", provider="chat", model="qwen-plus")
信息提取
接口名称:ai_extract。
DataFrame.llm.ai_extract( input_col, schema, *, provider=None, model=None, config=None )功能描述:按照给定的 JSON Schema 从文本中提取结构化信息。
入参
参数名称
参数类型
是否必填
描述
input_col
String / Expression
是
输入文本列名或列表达式。
schema
String
是
JSON Schema 字符串,描述要提取的字段,如
'{"name":"STRING", "phone":"STRING"}'。provider
String
否
Provider 名称。
model
String
否
模型名称。
config
Dict
否
运行时配置选项。
返回值
DataFrame 对象,追加
extracted_json(STRING) 列,包含提取的结构化信息。示例
df = pf.from_dict({"text": ["John Smith, male, 28 years old, phone ***-****"]}) schema = '{"name": "STRING", "age": "INTEGER", "phone": "STRING"}' df = df.llm.ai_extract("text", schema=schema, provider="chat", model="qwen-plus")
文本翻译
接口名称:ai_translate。
DataFrame.llm.ai_translate( input_col, source_lang, target_lang, *, provider=None, model=None, config=None )功能描述:将文本从源语言翻译为目标语言。
入参
参数名称
参数类型
是否必填
描述
input_col
String / Expression
是
输入文本列名或列表达式。
source_lang
String
是
源语言代码,如
"zh"、"en"、"auto"(自动检测)。支持:auto、zh、en、ja、ko、fr、de、es、ru、ar、pt。target_lang
String
是
目标语言代码,不可为
"auto"。provider
String
否
Provider 名称。
model
String
否
模型名称。
config
Dict
否
运行时配置选项。
返回值
DataFrame 对象,追加
translated_text(STRING) 和detected_language(STRING) 列,包含翻译结果和识别到的语言。示例
df = pf.from_dict({"text": ["Hello World", "How are you?"]}) df = df.llm.ai_translate("text", source_lang="en", target_lang="zh", provider="chat", model="qwen-plus")
文本摘要
接口名称:ai_summarize。
DataFrame.llm.ai_summarize( input_col, max_length, *, provider=None, model=None, config=None )功能描述:将文本摘要到指定最大长度。
入参
参数名称
参数类型
是否必填
描述
input_col
String / Expression
是
输入文本列名或列表达式。
max_length
int
是
摘要的最大字符数,必须大于 0。
provider
String
否
Provider 名称。
model
String
否
模型名称。
config
Dict
否
运行时配置选项。
返回值
DataFrame 对象,追加
summary(STRING) 列,包含摘要文本。示例
df = pf.from_dict({"article": ["This is a long article with lots of content..."]}) df = df.llm.ai_summarize("article", max_length=100, provider="chat", model="qwen-plus")
数据脱敏
接口名称:ai_mask。
DataFrame.llm.ai_mask( input_col, entities, *, provider=None, model=None, config=None )功能描述:对文本中的敏感信息进行脱敏处理。
入参
参数名称
参数类型
是否必填
描述
input_col
String / Expression
是
输入文本列名或列表达式。
entities
List[String]
是
需要脱敏的实体类型列表,如
["name", "phone"]。provider
String
否
Provider 名称。
model
String
否
模型名称。
config
Dict
否
运行时配置选项。
返回值
DataFrame 对象,追加
masked_text(STRING) 和detected_entities(ARRAY<STRING>) 列,包含脱敏后的文字和检测到的实体。示例
df = pf.from_dict({"text": ["Please contact John Smith at 555-0123"]}) df = df.llm.ai_mask("text", entities=["name", "phone"], provider="chat", model="qwen-plus")
向量类
文本向量化
接口名称:ai_embed。
DataFrame.llm.ai_embed( input_col, dimension=1024, *, provider=None, model=None, config=None )功能描述:为文本生成向量嵌入(Embedding)。
入参
参数名称
参数类型
是否必填
描述
input_col
String / Expression
是
输入文本列名或列表达式。
dimension
int
否
向量维度。默认值为 1024。
provider
String
否
Provider 名称。
model
String
否
模型名称。
config
Dict
否
运行时配置选项。
返回值
DataFrame 对象,追加
embedding(ARRAY<FLOAT>) 列,包含生成的向量。示例
df = pf.from_dict({"text": ["stream processing with Flink", "real-time analytics"]}) df = df.llm.ai_embed("text", dimension=1024, provider="embedding", model="text-embedding-v4")
环境与配置
API | 说明 | 参考 |
set_table_environment | 设置全局 TableEnvironment | |
get_table_environment | 获取当前 TableEnvironment | |
get_or_create_table_environment | 获取或自动创建 TableEnvironment | |
config.set(key, value) | 设置 DataFrame 配置项 | |
config.get(key) | 读取 DataFrame 配置项 |
修改运行参数
接口名称:DataFrameConfig.set。
DataFrameConfig.set( key, value )功能描述:修改 DataFrame API 作业的运行参数。您可按照下方示例的方法配置 Python 作业参数和 Table API 参数,配置的参数会应用到默认的
TableEnvironment上,且会覆盖作业运行参数配置中的同名参数。入参
参数名称
参数类型
是否必填
描述
key
String
是
参数名。
value
String
是
参数值。
返回值
无。
示例
import pyflink.dataframe as pf pf.config.set("python.fn-execution.arrow.batch.size", "256") pf.config.set("table.exec.async-scalar.buffer-capacity", "10") pf.config.set("table.exec.async-lookup.timeout", "1 min")说明建议您在编写作业时,先配置参数再定义作业逻辑,以确保参数按预期生效。