功能概览

更新时间:
复制为 MD 格式

本文以一个电商订单分析和客服问题处理的完整场景为例,为您介绍 DataFrame API 的功能特性。

构建 DataFrame 对象

使用本地数据

在进行开发调试时,可以用 Python 字典、记录列表或 Pandas DataFrame 快速构建 DataFrame。下面构建本文用到的 DataFrame:用户表 users、订单表 orders、事件表 events、用户画像表 profiles

import pandas as pd
import pyflink.dataframe as pf
from pyflink.dataframe import col, lit, udf, DataType

# User table
users = pf.from_dict({
    "user_id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
    "city": ["Hangzhou", "Shanghai", "Beijing"],
})

# Order fact table
orders = pf.from_records(
    [
        (1001, 1, 99.9, "PAID"),
        (1002, 2, 35.5, "CREATED"),
        (1003, 1, 188.0, "PAID"),
        (1004, 3, 88.8, None),
    ],
    schema=["order_id", "user_id", "amount", "status"],
)

# User behavior event table
events = pf.from_records(
    [
        {"event_id": "e1", "user_id": 1, "event": "login"},
        {"event_id": "e2", "user_id": 2, "event": "pay"},
    ],
    schema=["event_id", "user_id", "event"],
)

# User profile table
profiles = pf.from_records(
    [
        (1, "gold"),
        (2, "silver"),
        (3, "gold"),
    ],
    schema=["user_id", "member_level"],
)

# You can also build from pandas DataFrame
pdf = pd.DataFrame({"id": [1, 2], "score": [0.8, 0.95]})
df_from_pandas = pf.from_pandas(pdf)

读取外部数据

使用内置读取函数

在生产环境中,您可以用 DataFrame API 内置的读取函数从常见数据源中读取数据。

kafka_events = pf.read_kafka(
    "broker-1:9092,broker-2:9092",
    topic="user_events",
    group_id="df-api-demo",
    startup_mode="earliest-offset",
    format="json",
    schema={
        "user_id": DataType.int64(),
        "event": DataType.string(),
        "event_time": DataType.timestamp(3),
        "payload": DataType.string(),
    },
)

更多读取函数请参见数据读入

说明

读取外部系统时通常需要显式声明字段类型。DataType提供了常用类型,包括整数、浮点数、字符串、布尔值、日期时间、复合类型等。

使用自定义连接器

您可以使用read_custom函数从其他已支持的连接器自定义连接器中读取数据。

connector 参数为您使用连接器的 identifier,options 用于传递连接器的 WITH 参数。

source = pf.read_custom(
    "datagen",
    schema={
        "id": DataType.int64(),
        "name": DataType.string(),
    },
    options={
        "number-of-rows": "1000",
        "fields.id.kind": "sequence",
        "fields.id.start": "1",
        "fields.id.end": "1000",
    },
)

数据处理

DataFrame 的大多数变换都会返回一个新的 DataFrame,因此适合使用链式写法组合多个操作。常见操作包括投影、过滤、派生列、删除列、重命名、聚合、连接和缺失值处理。

投影、过滤和派生列

下面的 paid_orders 从订单表 orders 中筛选已支付订单,并按照订单金额分层:

paid_orders = (
    orders
    .select("order_id", "user_id", "amount", "status")
    .filter("status = 'PAID' AND amount > 0")
    .with_columns(
        amount_yuan=col("amount"),
        amount_level=(
            (col("amount") >= 100).then("high", "normal")
        ),
    )
    .drop("status", "amount")
    .rename({"amount_yuan": "amount"})
)

也可以用下标语法引用列或选择列:

amount_expr = paid_orders["amount"]
simple_view = paid_orders[["order_id", "user_id", "amount"]]
large_orders = paid_orders[col("amount") > 100]

缺失值和 NaN 处理

drop_null / fill_null 处理 SQL NULL,drop_nan / fill_nan 处理浮点列中的 NaN。

下面的 clean_orders 表示完成质量清洗后的订单表:

clean_orders = (
    orders
    .drop_null(subset=["order_id", "user_id"])
    .fill_null("UNKNOWN", subset=["status"])
    .fill_nan(0.0, subset=["amount"])
)

聚合

使用 group_byagg 做分组聚合。聚合结果中通常需要显式保留分组列。

下面的 user_summary 是按用户汇总后的订单指标表,包含订单数、总金额和平均金额:

user_summary = (
    clean_orders
    .filter("status = 'PAID'")
    .group_by("user_id")
    .agg(
        col("user_id"),
        col("order_id").count.alias("order_count"),
        col("amount").sum.alias("total_amount"),
        col("amount").avg.alias("avg_amount"),
    )
)

如果需要对全表聚合,可直接使用 select

overall = clean_orders.select(
    col("order_id").count.alias("order_count"),
    col("amount").sum.alias("total_amount"),
)

连接

join 支持 innerleftrightfullouter 等连接类型。

下面的例子将用户表 users 与事件表 events 关联,生成带有用户姓名、城市和行为事件的 enriched 宽表:

events_renamed = events.rename("user_id", "event_user_id")

enriched = (
    users
    .join(
        events_renamed,
        left_on="user_id",
        right_on="event_user_id",
        how="left",
    )
    .select(
        "user_id",
        "name",
        "city",
        "event_id",
        "event",
    )
)

使用 SQL 查询

您可以用 sql 运行 SELECT 查询。SQL 中的表名会自动绑定到当前作用域中同名的 Python DataFrame 变量,无需手动注册。

下面使用 SQL 从按用户汇总的订单表 user_summary 中筛选累计消费金额较高的用户,并与用户画像表 profiles 关联:

top_users = pf.sql("""
    SELECT
        user_summary.user_id,
        profiles.member_level,
        user_summary.order_count,
        user_summary.total_amount
    FROM user_summary
    LEFT JOIN profiles
        ON user_summary.user_id = profiles.user_id
    WHERE user_summary.total_amount >= 200
""")

用户自定义函数

当标准表达式、聚合或 SQL 无法覆盖业务逻辑时,可以使用 udf。DataFrame API 支持同步/异步、逐行/向量化等多种 UDF 形态。UDF 可以通过 with_column / with_columns 添加为新列,也可以通过 map / map_batches 对整行数据做变换。

with_column / with_columns

同步行式 UDF

标量 UDF 接收一列或多列,返回一列。建议使用 Python 类型标注自动推断返回类型;也可以用 return_dtype 显式指定。with_column 添加单列,with_columns 可同时添加多列。在定义 UDF 时,可以使用 concurrency 指定并发度。

@udf
def normalize_status(status: str) -> str:
    if status is None:
        return "UNKNOWN"
    return status.strip().upper()

@udf(concurrency=32)
def order_tag(amount: float, status: str) -> str:
    """Takes multiple columns as input"""
    if status == "PAID" and amount is not None and amount >= 100:
        return "high_value_paid"
    return "normal"

# with_column adds a single column
orders_with_status = orders.with_column(
    "status_norm", normalize_status(col("status"))
)

# with_columns adds multiple columns at once
orders_with_flags = orders.with_columns(
    status_norm=normalize_status(col("status")),
    tag=order_tag(col("amount"), col("status")),
)

异步行式 UDF

异步 UDF 允许多行同时执行 I/O 操作,从而提升吞吐,适合调用外部服务或执行 I/O 密集型逻辑。

import asyncio

@udf(return_dtype=DataType.string(), concurrency=32)
async def query_region(user_id: int) -> str:
    await asyncio.sleep(0.01)  # simulate async I/O
    return f"region_for_{user_id}"

orders_with_region = orders.with_column(
    "region", query_region(col("user_id")),
)

同步向量 UDF

向量化 UDF 按批接收和返回数据,减少 Python 与 Flink 运行时之间的逐行调用开销,适合纯计算逻辑。支持 Pandas(pandas.Series)和 Arrow(pyarrow.Array)两种格式,可以根据 UDF 中的计算逻辑按需选择。可以通过 batch_size 控制每批大小。

Pandas 格式

@udf(return_dtype=DataType.float64(), concurrency=32, batch_size=64)
def scale_amount_pandas(amounts: pd.Series) -> pd.Series:
    return amounts * 100.0

orders_scaled_pandas = orders.with_column(
    "amount_norm", scale_amount_pandas(col("amount")),
)

Arrow 格式

import pyarrow as pa
import pyarrow.compute as pc

@udf(return_dtype=DataType.float64(), concurrency=32, batch_size=64)
def scale_amount_arrow(amounts: pa.Array) -> pa.Array:
    return pc.multiply(amounts, 100.0)

orders_scaled_arrow = orders.with_column(
    "amount_scaled", scale_amount_arrow(col("amount")),
)

异步向量 UDF

异步向量 UDF 兼顾批处理吞吐与异步 I/O 并发。可以通过 batch_size 控制每批大小。

import asyncio

@udf(return_dtype=DataType.string(), concurrency=32, batch_size=64)
async def batch_enrich(statuses: pd.Series) -> pd.Series:
    async def enrich_one(s):
        await asyncio.sleep(0.01)  # simulate async API call
        return f"enriched_{s}"
    tasks = [enrich_one(s) for s in statuses]
    results = await asyncio.gather(*tasks)
    return pd.Series(results)

orders_enriched = orders.with_column(
    "status_enriched", batch_enrich(col("status")),
)

map / map_batches

同步行式 UDF

map 接收整行数据,返回新行。输入行在函数中可按列名访问;返回类型可以用 DataType.struct 显式声明,也可以通过 TypedDict 返回类型标注自动推导。

使用 return_dtype 显式声明

def build_order_feature(row):
    amount = row["amount"] or 0.0
    return {
        "order_id": row["order_id"],
        "user_id": row["user_id"],
        "feature": f"{row['status']}:{'large' if amount >= 100 else 'normal'}",
    }

order_features = orders.map(
    build_order_feature,
    return_dtype=DataType.struct({
        "order_id": DataType.int64(),
        "user_id": DataType.int64(),
        "feature": DataType.string(),
    }),
)

使用 TypedDict 自动推导类型

from typing import TypedDict

class OrderFeature(TypedDict):
    order_id: int
    user_id: int
    feature: str

def build_order_feature_typed(row) -> OrderFeature:
    amount = row["amount"] or 0.0
    return {
        "order_id": row["order_id"],
        "user_id": row["user_id"],
        "feature": f"{row['status']}:{'large' if amount >= 100 else 'normal'}",
    }

order_features_typed = orders.map(build_order_feature_typed)

同步向量 UDF

map_batches 适合向量化计算。batch_format="pandas" 时,函数输入和输出是 dict[str, pandas.Series]batch_format="arrow" 时,输入和输出是 `dict[str, pyarrow.Array]`。

Pandas 格式

def score_batch_pandas(batch: dict[str, pd.Series]) -> dict[str, pd.Series]:
    amount = batch["amount"].fillna(0.0)
    return {
        "order_id": batch["order_id"],
        "score": (amount / 100.0).clip(0.0, 1.0),
    }

order_scores_pandas = orders.map_batches(
    score_batch_pandas,
    batch_format="pandas",
    batch_size=1024,
    return_dtype=DataType.struct(
        {
            "order_id": DataType.int64(),
            "score": DataType.float64(),
        }
    ),
)

Arrow 格式

import pyarrow as pa
import pyarrow.compute as pc

def score_batch_arrow(batch: dict[str, pa.Array]) -> dict[str, pa.Array]:
    amount = pc.if_else(pc.is_null(batch["amount"]), 0.0, batch["amount"])
    raw_score = pc.divide(pc.cast(amount, pa.float64()), 100.0)
    score = pc.if_else(
        pc.less(raw_score, 0.0),
        0.0,
        pc.if_else(pc.greater(raw_score, 1.0), 1.0, raw_score),
    )
    return {
        "order_id": batch["order_id"],
        "score": score,
    }

order_scores_arrow = orders.map_batches(
    score_batch_arrow,
    batch_format="arrow",
    batch_size=1024,
    return_dtype=DataType.struct(
        {
            "order_id": DataType.int64(),
            "score": DataType.float64(),
        }
    ),
)

AI / LLM

DataFrame API 通过 df.llm 访问 AI/LLM 能力。您可以先通过 set_provider 注册模型 Provider,再在 DataFrame 上调用通用预测或内置 AI 函数。

支持的 Provider 包括:

配置 Provider

本例中,需要注册两个模型 Provider,分别用于文本类任务和向量化任务。您可使用不同名称区分多个 Provider。

API_KEY="sk-..."

pf.set_provider(
    "chat",
    pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
        api_key=API_KEY,
        model="qwen-plus",
        system_prompt="You are a helpful assistant.",
        temperature=0.2,
    ),
)

pf.set_provider(
    "embedding",
    pf.OpenAICompatProvider(
        endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings",
        api_key=API_KEY,
        model="text-embedding-v4",
    ),
)

通用调用

predict 用于 LLM 通用调用,支持单列输入,输出字段可用 output_type 指定。

例如,下面的 questions 表示客服工单问题文本,answers 是调用模型后追加回答字段的结果表。

questions = pf.from_records(
    [
        (1, "Why hasn't my order been shipped yet? My order number is 1. Charlie"),
        (2, "How do I apply for a refund? My email is bob@example.com. Order number: 2."),
    ],
    schema=["ticket_id", "question"],
)

answers = questions.llm.predict(
    "question",
    provider="chat",
    output_type={"answer": DataType.string()},
    config={
        "user-prompt": "Answer the customer support question concisely. Return only the answer.",
    },
)

内置 AI 函数

您可使用内置 AI 函数实现常见任务。内置 AI 函数会在原 DataFrame 后追加结果列,具体格式请参见AI / LLM 函数

# classify
classified = questions.llm.ai_classify(
    "question",
    labels=["logistics", "refund", "account", "other"],
    provider="chat",
)

# sentiment analysis
sentiment = questions.llm.ai_sentiment("question", provider="chat")

# information extraction, schema described as JSON string
extracted = questions.llm.ai_extract(
    "question",
    '{"order_id":"STRING","intent":"STRING"}',
    provider="chat",
)

# summarize
summaries = questions.llm.ai_summarize(
    "question",
    max_length=50,
    provider="chat",
)

# mask sensitive data
masked = questions.llm.ai_mask(
    "question",
    entities=["PERSON", "PHONE", "EMAIL"],
    provider="chat",
)

# embedding
embeddings = questions.llm.ai_embed(
    "question",
    dimension=1024,
    provider="embedding",
)

数据输出

DataFrame 的写出方法会把当前 DataFrame 写入目标系统。

使用内置输出函数

您可以用 DataFrame API 内置的输出函数将数据输出到常见系统。

enriched.write_kafka(
    "broker-1:9092,broker-2:9092",
    topic="user_summary",
    format="json",
    key_format="json",
    key_fields=["user_id"],
    delivery_guarantee="at-least-once",
)

更多数据输出函数请参考数据输出

使用自定义连接器

您可以使用write_custom函数通过其他已支持的连接器自定义连接器输出数据。

connector 参数为您使用连接器的 identifier,options 用于传递连接器的 WITH 参数。

enriched.write_custom(
    "blackhole",
    options={
        "sink.parallelism": "4",
    },
)

如果连接器需要主键约束,可以传入 primary_key

enriched.write_custom(
    "my-custom-sink",
    primary_key="user_id",
    options={
        "endpoint": "example.com:9000",
        "format": "json",
    },
)