本文以一个电商订单分析和客服问题处理的完整场景为例,为您介绍 DataFrame API 的功能特性。
构建 DataFrame 对象
使用本地数据
在进行开发调试时,可以用 Python 字典、记录列表或 Pandas DataFrame 快速构建 DataFrame。下面构建本文用到的 DataFrame:用户表 users、订单表 orders、事件表 events、用户画像表 profiles。
import pandas as pd
import pyflink.dataframe as pf
from pyflink.dataframe import col, lit, udf, DataType
# User table
users = pf.from_dict({
"user_id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"city": ["Hangzhou", "Shanghai", "Beijing"],
})
# Order fact table
orders = pf.from_records(
[
(1001, 1, 99.9, "PAID"),
(1002, 2, 35.5, "CREATED"),
(1003, 1, 188.0, "PAID"),
(1004, 3, 88.8, None),
],
schema=["order_id", "user_id", "amount", "status"],
)
# User behavior event table
events = pf.from_records(
[
{"event_id": "e1", "user_id": 1, "event": "login"},
{"event_id": "e2", "user_id": 2, "event": "pay"},
],
schema=["event_id", "user_id", "event"],
)
# User profile table
profiles = pf.from_records(
[
(1, "gold"),
(2, "silver"),
(3, "gold"),
],
schema=["user_id", "member_level"],
)
# You can also build from pandas DataFrame
pdf = pd.DataFrame({"id": [1, 2], "score": [0.8, 0.95]})
df_from_pandas = pf.from_pandas(pdf)读取外部数据
使用内置读取函数
在生产环境中,您可以用 DataFrame API 内置的读取函数从常见数据源中读取数据。
kafka_events = pf.read_kafka(
"broker-1:9092,broker-2:9092",
topic="user_events",
group_id="df-api-demo",
startup_mode="earliest-offset",
format="json",
schema={
"user_id": DataType.int64(),
"event": DataType.string(),
"event_time": DataType.timestamp(3),
"payload": DataType.string(),
},
)更多读取函数请参见数据读入。
读取外部系统时通常需要显式声明字段类型。DataType提供了常用类型,包括整数、浮点数、字符串、布尔值、日期时间、复合类型等。
使用自定义连接器
您可以使用read_custom函数从其他已支持的连接器或自定义连接器中读取数据。
connector 参数为您使用连接器的 identifier,options 用于传递连接器的 WITH 参数。
source = pf.read_custom(
"datagen",
schema={
"id": DataType.int64(),
"name": DataType.string(),
},
options={
"number-of-rows": "1000",
"fields.id.kind": "sequence",
"fields.id.start": "1",
"fields.id.end": "1000",
},
)数据处理
DataFrame 的大多数变换都会返回一个新的 DataFrame,因此适合使用链式写法组合多个操作。常见操作包括投影、过滤、派生列、删除列、重命名、聚合、连接和缺失值处理。
投影、过滤和派生列
下面的 paid_orders 从订单表 orders 中筛选已支付订单,并按照订单金额分层:
paid_orders = (
orders
.select("order_id", "user_id", "amount", "status")
.filter("status = 'PAID' AND amount > 0")
.with_columns(
amount_yuan=col("amount"),
amount_level=(
(col("amount") >= 100).then("high", "normal")
),
)
.drop("status", "amount")
.rename({"amount_yuan": "amount"})
)也可以用下标语法引用列或选择列:
amount_expr = paid_orders["amount"]
simple_view = paid_orders[["order_id", "user_id", "amount"]]
large_orders = paid_orders[col("amount") > 100]缺失值和 NaN 处理
drop_null / fill_null 处理 SQL NULL,drop_nan / fill_nan 处理浮点列中的 NaN。
下面的 clean_orders 表示完成质量清洗后的订单表:
clean_orders = (
orders
.drop_null(subset=["order_id", "user_id"])
.fill_null("UNKNOWN", subset=["status"])
.fill_nan(0.0, subset=["amount"])
)聚合
使用 group_by 和 agg 做分组聚合。聚合结果中通常需要显式保留分组列。
下面的 user_summary 是按用户汇总后的订单指标表,包含订单数、总金额和平均金额:
user_summary = (
clean_orders
.filter("status = 'PAID'")
.group_by("user_id")
.agg(
col("user_id"),
col("order_id").count.alias("order_count"),
col("amount").sum.alias("total_amount"),
col("amount").avg.alias("avg_amount"),
)
)如果需要对全表聚合,可直接使用 select:
overall = clean_orders.select(
col("order_id").count.alias("order_count"),
col("amount").sum.alias("total_amount"),
)连接
join 支持 inner、left、right、full、outer 等连接类型。
下面的例子将用户表 users 与事件表 events 关联,生成带有用户姓名、城市和行为事件的 enriched 宽表:
events_renamed = events.rename("user_id", "event_user_id")
enriched = (
users
.join(
events_renamed,
left_on="user_id",
right_on="event_user_id",
how="left",
)
.select(
"user_id",
"name",
"city",
"event_id",
"event",
)
)使用 SQL 查询
您可以用 sql 运行 SELECT 查询。SQL 中的表名会自动绑定到当前作用域中同名的 Python DataFrame 变量,无需手动注册。
下面使用 SQL 从按用户汇总的订单表 user_summary 中筛选累计消费金额较高的用户,并与用户画像表 profiles 关联:
top_users = pf.sql("""
SELECT
user_summary.user_id,
profiles.member_level,
user_summary.order_count,
user_summary.total_amount
FROM user_summary
LEFT JOIN profiles
ON user_summary.user_id = profiles.user_id
WHERE user_summary.total_amount >= 200
""")用户自定义函数
当标准表达式、聚合或 SQL 无法覆盖业务逻辑时,可以使用 udf。DataFrame API 支持同步/异步、逐行/向量化等多种 UDF 形态。UDF 可以通过 with_column / with_columns 添加为新列,也可以通过 map / map_batches 对整行数据做变换。
with_column / with_columns
同步行式 UDF
标量 UDF 接收一列或多列,返回一列。建议使用 Python 类型标注自动推断返回类型;也可以用 return_dtype 显式指定。with_column 添加单列,with_columns 可同时添加多列。在定义 UDF 时,可以使用 concurrency 指定并发度。
@udf
def normalize_status(status: str) -> str:
if status is None:
return "UNKNOWN"
return status.strip().upper()
@udf(concurrency=32)
def order_tag(amount: float, status: str) -> str:
"""Takes multiple columns as input"""
if status == "PAID" and amount is not None and amount >= 100:
return "high_value_paid"
return "normal"
# with_column adds a single column
orders_with_status = orders.with_column(
"status_norm", normalize_status(col("status"))
)
# with_columns adds multiple columns at once
orders_with_flags = orders.with_columns(
status_norm=normalize_status(col("status")),
tag=order_tag(col("amount"), col("status")),
)异步行式 UDF
异步 UDF 允许多行同时执行 I/O 操作,从而提升吞吐,适合调用外部服务或执行 I/O 密集型逻辑。
import asyncio
@udf(return_dtype=DataType.string(), concurrency=32)
async def query_region(user_id: int) -> str:
await asyncio.sleep(0.01) # simulate async I/O
return f"region_for_{user_id}"
orders_with_region = orders.with_column(
"region", query_region(col("user_id")),
)同步向量 UDF
向量化 UDF 按批接收和返回数据,减少 Python 与 Flink 运行时之间的逐行调用开销,适合纯计算逻辑。支持 Pandas(pandas.Series)和 Arrow(pyarrow.Array)两种格式,可以根据 UDF 中的计算逻辑按需选择。可以通过 batch_size 控制每批大小。
Pandas 格式
@udf(return_dtype=DataType.float64(), concurrency=32, batch_size=64)
def scale_amount_pandas(amounts: pd.Series) -> pd.Series:
return amounts * 100.0
orders_scaled_pandas = orders.with_column(
"amount_norm", scale_amount_pandas(col("amount")),
)Arrow 格式
import pyarrow as pa
import pyarrow.compute as pc
@udf(return_dtype=DataType.float64(), concurrency=32, batch_size=64)
def scale_amount_arrow(amounts: pa.Array) -> pa.Array:
return pc.multiply(amounts, 100.0)
orders_scaled_arrow = orders.with_column(
"amount_scaled", scale_amount_arrow(col("amount")),
)异步向量 UDF
异步向量 UDF 兼顾批处理吞吐与异步 I/O 并发。可以通过 batch_size 控制每批大小。
import asyncio
@udf(return_dtype=DataType.string(), concurrency=32, batch_size=64)
async def batch_enrich(statuses: pd.Series) -> pd.Series:
async def enrich_one(s):
await asyncio.sleep(0.01) # simulate async API call
return f"enriched_{s}"
tasks = [enrich_one(s) for s in statuses]
results = await asyncio.gather(*tasks)
return pd.Series(results)
orders_enriched = orders.with_column(
"status_enriched", batch_enrich(col("status")),
)map / map_batches
同步行式 UDF
map 接收整行数据,返回新行。输入行在函数中可按列名访问;返回类型可以用 DataType.struct 显式声明,也可以通过 TypedDict 返回类型标注自动推导。
使用 return_dtype 显式声明
def build_order_feature(row):
amount = row["amount"] or 0.0
return {
"order_id": row["order_id"],
"user_id": row["user_id"],
"feature": f"{row['status']}:{'large' if amount >= 100 else 'normal'}",
}
order_features = orders.map(
build_order_feature,
return_dtype=DataType.struct({
"order_id": DataType.int64(),
"user_id": DataType.int64(),
"feature": DataType.string(),
}),
)使用 TypedDict 自动推导类型
from typing import TypedDict
class OrderFeature(TypedDict):
order_id: int
user_id: int
feature: str
def build_order_feature_typed(row) -> OrderFeature:
amount = row["amount"] or 0.0
return {
"order_id": row["order_id"],
"user_id": row["user_id"],
"feature": f"{row['status']}:{'large' if amount >= 100 else 'normal'}",
}
order_features_typed = orders.map(build_order_feature_typed)同步向量 UDF
map_batches 适合向量化计算。batch_format="pandas" 时,函数输入和输出是 dict[str, pandas.Series];batch_format="arrow" 时,输入和输出是 `dict[str, pyarrow.Array]`。
Pandas 格式
def score_batch_pandas(batch: dict[str, pd.Series]) -> dict[str, pd.Series]:
amount = batch["amount"].fillna(0.0)
return {
"order_id": batch["order_id"],
"score": (amount / 100.0).clip(0.0, 1.0),
}
order_scores_pandas = orders.map_batches(
score_batch_pandas,
batch_format="pandas",
batch_size=1024,
return_dtype=DataType.struct(
{
"order_id": DataType.int64(),
"score": DataType.float64(),
}
),
)Arrow 格式
import pyarrow as pa
import pyarrow.compute as pc
def score_batch_arrow(batch: dict[str, pa.Array]) -> dict[str, pa.Array]:
amount = pc.if_else(pc.is_null(batch["amount"]), 0.0, batch["amount"])
raw_score = pc.divide(pc.cast(amount, pa.float64()), 100.0)
score = pc.if_else(
pc.less(raw_score, 0.0),
0.0,
pc.if_else(pc.greater(raw_score, 1.0), 1.0, raw_score),
)
return {
"order_id": batch["order_id"],
"score": score,
}
order_scores_arrow = orders.map_batches(
score_batch_arrow,
batch_format="arrow",
batch_size=1024,
return_dtype=DataType.struct(
{
"order_id": DataType.int64(),
"score": DataType.float64(),
}
),
)AI / LLM
DataFrame API 通过 df.llm 访问 AI/LLM 能力。您可以先通过 set_provider 注册模型 Provider,再在 DataFrame 上调用通用预测或内置 AI 函数。
支持的 Provider 包括:
OpenAICompatProvider:适用于 OpenAI 兼容接口。DashScopeProvider:适用于阿里云百炼,在 OpenAI 兼容接口的基础上增加了多模态向量化能力。TritonProvider:适用于 NVIDIA Triton Inference Server。GenericProvider:适用于自定义模型服务。
配置 Provider
本例中,需要注册两个模型 Provider,分别用于文本类任务和向量化任务。您可使用不同名称区分多个 Provider。
API_KEY="sk-..."
pf.set_provider(
"chat",
pf.OpenAICompatProvider(
endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/chat/completions",
api_key=API_KEY,
model="qwen-plus",
system_prompt="You are a helpful assistant.",
temperature=0.2,
),
)
pf.set_provider(
"embedding",
pf.OpenAICompatProvider(
endpoint="https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings",
api_key=API_KEY,
model="text-embedding-v4",
),
)通用调用
predict 用于 LLM 通用调用,支持单列输入,输出字段可用 output_type 指定。
例如,下面的 questions 表示客服工单问题文本,answers 是调用模型后追加回答字段的结果表。
questions = pf.from_records(
[
(1, "Why hasn't my order been shipped yet? My order number is 1. Charlie"),
(2, "How do I apply for a refund? My email is bob@example.com. Order number: 2."),
],
schema=["ticket_id", "question"],
)
answers = questions.llm.predict(
"question",
provider="chat",
output_type={"answer": DataType.string()},
config={
"user-prompt": "Answer the customer support question concisely. Return only the answer.",
},
)内置 AI 函数
您可使用内置 AI 函数实现常见任务。内置 AI 函数会在原 DataFrame 后追加结果列,具体格式请参见AI / LLM 函数。
# classify
classified = questions.llm.ai_classify(
"question",
labels=["logistics", "refund", "account", "other"],
provider="chat",
)
# sentiment analysis
sentiment = questions.llm.ai_sentiment("question", provider="chat")
# information extraction, schema described as JSON string
extracted = questions.llm.ai_extract(
"question",
'{"order_id":"STRING","intent":"STRING"}',
provider="chat",
)
# summarize
summaries = questions.llm.ai_summarize(
"question",
max_length=50,
provider="chat",
)
# mask sensitive data
masked = questions.llm.ai_mask(
"question",
entities=["PERSON", "PHONE", "EMAIL"],
provider="chat",
)
# embedding
embeddings = questions.llm.ai_embed(
"question",
dimension=1024,
provider="embedding",
)数据输出
DataFrame 的写出方法会把当前 DataFrame 写入目标系统。
使用内置输出函数
您可以用 DataFrame API 内置的输出函数将数据输出到常见系统。
enriched.write_kafka(
"broker-1:9092,broker-2:9092",
topic="user_summary",
format="json",
key_format="json",
key_fields=["user_id"],
delivery_guarantee="at-least-once",
)更多数据输出函数请参考数据输出。
使用自定义连接器
您可以使用write_custom函数通过其他已支持的连接器或自定义连接器输出数据。
connector 参数为您使用连接器的 identifier,options 用于传递连接器的 WITH 参数。
enriched.write_custom(
"blackhole",
options={
"sink.parallelism": "4",
},
)如果连接器需要主键约束,可以传入 primary_key:
enriched.write_custom(
"my-custom-sink",
primary_key="user_id",
options={
"endpoint": "example.com:9000",
"format": "json",
},
)