Python DataFrame API 参考

更新时间:
复制为 MD 格式

Python DataFrame API 为您提供 DataFrame 风格的 Python API 接口,用于编写 Flink 作业。其中包括筛选、投影、连接、聚合等基本 API,让您可以用传统关系代数的方式组织数据处理逻辑;同时,DataFrame API 支持使用 Python 编写用户自定义函数,实现标准算子难以覆盖的特定业务逻辑和数据操作;此外,DataFrame API 提供 AI/LLM 功能,让您在数据处理流程中直接接入大模型服务,并以开箱即用的方式调用分类、情感分析、抽取、翻译、摘要、向量生成等专用 AI 函数。本文为您介绍支持的 API,供您开发作业参考。完整的 API 文档请参见 PyFlink DataFrame

DataFrame 基本操作

以下表格汇总了 DataFrame 上的核心操作。

API 类型

API 详情

构造 / 创建

from_tablefrom_pandasfrom_dictfrom_records

属性

schemacolumns

投影 / 列操作

selectwith_columnwith_columnsdrop_columnsrename_columns

过滤

filter

聚合

group_byagg

连接 / 合并

join

行映射

mapmap_batches

限制 / 分页

limitoffset

空值处理

drop_nulldrop_nanfill_nullfill_nan

输出 / 收集

to_tableto_pandascollectiter_rowsiter_batches

调试 / 执行计划

explain

SQL

sql

表达式辅助函数

函数

说明

col

创建列引用表达式

lit

创建字面量表达式

数据类型

DataType 用于描述 DataFrame 列的类型。

类型分类

方法

布尔

DataType.bool

整数

DataType.int8DataType.int16DataType.int32DataType.int64

浮点

DataType.float32DataType.float64DataType.decimal

字符串

DataType.stringDataType.fixed_size_string

二进制

DataType.binaryDataType.fixed_size_binary

日期 / 时间

DataType.dateDataType.timeDataType.timestampDataType.timestamp_ltz

复合类型

DataType.listDataType.mapDataType.structDataType.tensor

特殊类型

DataType.nullDataType.variant

可空性修饰

DataType.not_nullDataType.nullable

DataType 可用于为数据源指定 schema、指定 UDF 输出类型等场景。例如,使用 DataType 为 Kafka 数据源指定 schema:

import pyflink.dataframe as pf
from pyflink.dataframe import DataType

df = pf.read_kafka(
    "localhost:9092",
    topic="user_events",
    schema={
        "user_id": DataType.string(),
        "event_type": DataType.string(),
        "amount": DataType.decimal(10, 2),
        "tags": DataType.list(DataType.string()),
        "event_time": DataType.timestamp_ltz(3),
    },
    format="json",
    startup_mode="earliest-offset",
)

I/O 读写

DataFrame 提供了丰富的数据源读写接口,您可以方便地读写常见的数据源,也可以使用 read_custom 或 write_custom 函数调用其他已支持的连接器自定义连接器读写数据。

数据读入

数据源

API

Parquet 文件

read_parquet

Kafka

read_kafka

MaxCompute (ODPS)

read_odps

Paimon

read_paimon

SLS (日志服务)

read_sls

Hologres

read_hologres

自定义连接器

read_custom

数据输出

数据目标

API

Parquet 文件

write_parquet

Kafka

write_kafka

MaxCompute (ODPS)

write_odps

Paimon

write_paimon

SLS (日志服务)

write_sls

Hologres

write_hologres

自定义连接器

write_custom

用户自定义函数

udf 装饰器支持将 Python 函数注册为 DataFrame 可用的标量函数。按执行方式可分为四类:

  • 同步逐行:普通 Python 函数,每次处理一行。

  • 异步逐行:适合调用外部服务或执行 I/O 密集型逻辑,允许多行同时执行 I/O 操作,从而提升吞吐。

  • 同步向量:按批接收和返回数据,减少 Python 与 Flink 运行时之间的逐行调用开销,适合纯计算逻辑。

  • 异步向量:兼顾批处理吞吐与异步 I/O 并发。


udf

  • 接口名称:udf

    udf(
        func=None,
        *,
        return_dtype=None,
        deterministic=True,
        name=None,
        func_type=None,
        concurrency=None,
        batch_size=None
    )
  • 功能描述:将 Python 函数注册为 DataFrame 可用的标量用户自定义函数。支持普通函数、async 函数、ScalarFunction / AsyncScalarFunction 子类及可调用类。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    func

    Callable / Class

    要包装的 Python 函数、ScalarFunction 实例/子类或可调用类。若省略则返回装饰器。

    return_dtype

    DataType / str / type

    返回类型。可以是 DataType 实例(如 DataType.int64())、Python 类型(如 int)或 SQL 类型字符串(如 'BIGINT')。若省略则从函数返回类型提示自动推断。

    deterministic

    Boolean

    函数是否是确定性的。默认值为 True

    name

    String

    UDF 的名称。若不指定则使用函数名。

    func_type

    String

    执行格式,可选 "general""pandas""arrow"。若不指定则根据参数类型提示自动检测。

    concurrency

    int

    UDF 算子的并行度。不同 concurrency 值的 UDF 会被拆分到独立算子。

    batch_size

    int

    每批最大元素数,仅适用于批处理 UDF(pandas / arrow 模式)。

  • 返回值

    DataFrameUDFWrapper 对象,可通过 udf_func(col("a"), col("b")) 的方式在 with_columnwith_columnsmapmap_batches等操作中使用。

  • 示例:请参见用户自定义函数

AI / LLM 函数

通过 df.llm 访问器,DataFrame 提供了一系列内置 AI 函数。详细 API 参考见 AI/LLM

Provider 配置

使用 AI 函数前,需先注册模型服务商(Provider)。

API

说明

set_provider

注册一个模型 Provider

set_default_provider

设置默认 Provider(多 Provider 场景下使用)

list_providers

列举已注册的 Provider

支持的 Provider 类型:

Provider

适用场景

OpenAICompatProvider

OpenAI、DeepSeek、灵积(百炼)等所有 OpenAI 兼容接口

DashScopeProvider

阿里云灵积(DashScope),支持多模态 Embedding

TritonProvider

NVIDIA Triton Inference Server

GenericProvider

通用 Provider,通过键值对配置任意后端

set_provider

  • 接口名称:set_provider

    set_provider(
        name_or_provider,
        provider=None,
        **options
    )
  • 功能描述:注册一个全局模型 Provider 配置。支持三种调用方式:传入 Provider 实例、传入名称 + Provider 实例、传入名称 + 关键字参数。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    name_or_provider

    Provider / String

    Provider 实例(自动以其默认名称注册)或自定义名称字符串。

    provider

    Provider

    Provider 实例,仅在第一个参数为名称字符串时使用。

    **options

    key=value

    Provider 配置项(如 endpointapi_key),仅在第一个参数为名称字符串且未传入 provider 时使用,将创建 GenericProvider。

  • 返回值

    无。

  • 示例

    import pyflink.dataframe as pf
    
    # Option 1: pass a Provider instance directly
    pf.set_provider(pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
        api_key="sk-..."
    ))
    
    # Option 2: custom name + Provider instance (register multiple)
    pf.set_provider("chat", pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
        api_key="sk-..."
    ))
    pf.set_provider("embedding", pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings",
        api_key="sk-..."
    ))
    
    # Option 3: name + keyword arguments (creates GenericProvider)
    pf.set_provider("openai-compat",
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
        api_key="sk-..."
    )

通用调用

  • 接口名称:predict

    DataFrame.llm.predict(
        *input_cols,
        provider=None,
        model=None,
        output_type=None,
        config=None
    )
    
  • 功能描述:通用模型推理。将指定输入列发送给模型,返回模型输出列追加到 DataFrame 中。支持自定义输出 schema。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    *input_cols

    String

    作为模型输入的列名,可传入多个。

    provider

    String

    Provider 名称。若不指定则使用默认 Provider;若无 Provider 配置,则 model 被视为 Catalog Model 名称。

    model

    String

    模型名称(如 "qwen-plus")或 Catalog Model 名称。

    output_type

    Dict

    输出列 schema,格式为 {列名: 类型},类型可以是 SQL 类型字符串或 DataType 对象。默认值为 {"output": "STRING"}

    config

    Dict

    运行时配置选项。

  • 返回值

    DataFrame 对象,包含原始列和模型输出列。

  • 示例

    import pyflink.dataframe as pf
    
    pf.set_provider(pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
        api_key="sk-..."
    ))
    
    df = pf.from_dict({"question": ["What is Flink?", "What is stream processing?"]})
    
    # default output column: output (STRING)
    df = df.llm.predict("question", provider="chat", model="qwen-plus")
    
    # custom output schema
    df = df.llm.predict("question", provider="chat", model="qwen-plus",
                        output_type={"answer": "STRING", "score": "DOUBLE"})
    

文本类

文本分类

  • 接口名称:ai_classify

    DataFrame.llm.ai_classify(
        input_col,
        labels,
        *,
        provider=None,
        model=None,
        config=None
    )
  • 功能描述:将文本分类到指定标签列表中的某一个。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    input_col

    String / Expression

    输入文本列名或列表达式。

    labels

    List[String]

    分类标签列表,如 ["positive", "negative", "neutral"]

    provider

    String

    Provider 名称。

    model

    String

    模型名称。

    config

    Dict

    运行时配置选项。

  • 返回值

    DataFrame 对象,追加 category (STRING) 和 confidence (DOUBLE) 列,包含分类结果和置信分数。

  • 示例

    df = pf.from_dict({"review": ["Great product", "Terrible, do not buy", "It is okay"]})
    df = df.llm.ai_classify("review",
                            labels=["positive", "negative", "neutral"],
                            provider="chat", model="qwen-plus")

情感分析

  • 接口名称:ai_sentiment

    DataFrame.llm.ai_sentiment(
        input_col,
        *,
        provider=None,
        model=None,
        config=None
    )
  • 功能描述:对输入文本进行情感分析。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    input_col

    String / Expression

    输入文本列名或列表达式。

    provider

    String

    Provider 名称。

    model

    String

    模型名称。

    config

    Dict

    运行时配置选项。

  • 返回值

    DataFrame 对象,追加以下列,包含情感分析结果:

    • score (DOUBLE): 情感分析分数,在 -1.0 到 1.0 之间。

    • label (STRING): “positive”, “negative”, “neutral” 之一。

    • confidence (DOUBLE): 置信分数。

  • 示例

    df = pf.from_dict({"comment": ["This feature is amazing!", "Broke after one month"]})
    df = df.llm.ai_sentiment("comment", provider="chat", model="qwen-plus")

信息提取

  • 接口名称:ai_extract

    DataFrame.llm.ai_extract(
        input_col,
        schema,
        *,
        provider=None,
        model=None,
        config=None
    )
    
  • 功能描述:按照给定的 JSON Schema 从文本中提取结构化信息。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    input_col

    String / Expression

    输入文本列名或列表达式。

    schema

    String

    JSON Schema 字符串,描述要提取的字段,如 '{"name":"STRING", "phone":"STRING"}'

    provider

    String

    Provider 名称。

    model

    String

    模型名称。

    config

    Dict

    运行时配置选项。

  • 返回值

    DataFrame 对象,追加 extracted_json (STRING) 列,包含提取的结构化信息。

  • 示例

    df = pf.from_dict({"text": ["John Smith, male, 28 years old, phone ***-****"]})
    schema = '{"name": "STRING", "age": "INTEGER", "phone": "STRING"}'
    df = df.llm.ai_extract("text", schema=schema,
                           provider="chat", model="qwen-plus")

文本翻译

  • 接口名称:ai_translate

    DataFrame.llm.ai_translate(
        input_col,
        source_lang,
        target_lang,
        *,
        provider=None,
        model=None,
        config=None
    )
    
  • 功能描述:将文本从源语言翻译为目标语言。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    input_col

    String / Expression

    输入文本列名或列表达式。

    source_lang

    String

    源语言代码,如 "zh""en""auto"(自动检测)。支持:autozhenjakofrdeesruarpt

    target_lang

    String

    目标语言代码,不可为 "auto"

    provider

    String

    Provider 名称。

    model

    String

    模型名称。

    config

    Dict

    运行时配置选项。

  • 返回值

    DataFrame 对象,追加 translated_text (STRING) 和 detected_language (STRING) 列,包含翻译结果和识别到的语言。

  • 示例

    df = pf.from_dict({"text": ["Hello World", "How are you?"]})
    df = df.llm.ai_translate("text", source_lang="en", target_lang="zh",
                             provider="chat", model="qwen-plus")

文本摘要

  • 接口名称:ai_summarize

    DataFrame.llm.ai_summarize(
        input_col,
        max_length,
        *,
        provider=None,
        model=None,
        config=None
    )
    
  • 功能描述:将文本摘要到指定最大长度。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    input_col

    String / Expression

    输入文本列名或列表达式。

    max_length

    int

    摘要的最大字符数,必须大于 0。

    provider

    String

    Provider 名称。

    model

    String

    模型名称。

    config

    Dict

    运行时配置选项。

  • 返回值

    DataFrame 对象,追加 summary (STRING) 列,包含摘要文本。

  • 示例

    df = pf.from_dict({"article": ["This is a long article with lots of content..."]})
    df = df.llm.ai_summarize("article", max_length=100,
                             provider="chat", model="qwen-plus")
    

数据脱敏

  • 接口名称:ai_mask

    DataFrame.llm.ai_mask(
        input_col,
        entities,
        *,
        provider=None,
        model=None,
        config=None
    )
    
  • 功能描述:对文本中的敏感信息进行脱敏处理。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    input_col

    String / Expression

    输入文本列名或列表达式。

    entities

    List[String]

    需要脱敏的实体类型列表,如 ["name", "phone"]

    provider

    String

    Provider 名称。

    model

    String

    模型名称。

    config

    Dict

    运行时配置选项。

  • 返回值

    DataFrame 对象,追加 masked_text (STRING) 和 detected_entities (ARRAY<STRING>) 列,包含脱敏后的文字和检测到的实体。

  • 示例

    df = pf.from_dict({"text": ["Please contact John Smith at 555-0123"]})
    df = df.llm.ai_mask("text", entities=["name", "phone"],
                        provider="chat", model="qwen-plus")
    

向量类

文本向量化

  • 接口名称:ai_embed

    DataFrame.llm.ai_embed(
        input_col,
        dimension=1024,
        *,
        provider=None,
        model=None,
        config=None
    )
    
  • 功能描述:为文本生成向量嵌入(Embedding)。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    input_col

    String / Expression

    输入文本列名或列表达式。

    dimension

    int

    向量维度。默认值为 1024。

    provider

    String

    Provider 名称。

    model

    String

    模型名称。

    config

    Dict

    运行时配置选项。

  • 返回值

    DataFrame 对象,追加 embedding (ARRAY<FLOAT>) 列,包含生成的向量。

  • 示例

    df = pf.from_dict({"text": ["stream processing with Flink", "real-time analytics"]})
    df = df.llm.ai_embed("text", dimension=1024,
                         provider="embedding", model="text-embedding-v4")
    

环境与配置

API

说明

参考

set_table_environment

设置全局 TableEnvironment

Config

get_table_environment

获取当前 TableEnvironment

Config

get_or_create_table_environment

获取或自动创建 TableEnvironment

Config

config.set(key, value)

设置 DataFrame 配置项

DataFrameConfig

config.get(key)

读取 DataFrame 配置项

DataFrameConfig

修改运行参数

  • 接口名称:DataFrameConfig.set

    DataFrameConfig.set(
        key, value
    )
  • 功能描述:修改 DataFrame API 作业的运行参数。您可按照下方示例的方法配置 Python 作业参数Table API 参数,配置的参数会应用到默认的 TableEnvironment 上,且会覆盖作业运行参数配置中的同名参数。

  • 入参

    参数名称

    参数类型

    是否必填

    描述

    key

    String

    参数名。

    value

    String

    参数值。

  • 返回值

    无。

  • 示例

    import pyflink.dataframe as pf
    
    pf.config.set("python.fn-execution.arrow.batch.size", "256")
    pf.config.set("table.exec.async-scalar.buffer-capacity", "10")
    pf.config.set("table.exec.async-lookup.timeout", "1 min")
    说明

    建议您在编写作业时,先配置参数再定义作业逻辑,以确保参数按预期生效。