Daft已内置丰富的表达式和多模态算子,可以覆盖大多数常见处理需求。当内置算子无法满足业务逻辑时,您可以编写UDF(自定义函数),将自定义的逐行处理、批量处理或有状态处理逻辑接入DataFrame流水线。
什么时候需要自定义UDF
Daft内置了大量表达式和多模态算子。建议仅在以下场景编写自定义UDF:
内置算子无法覆盖您的业务逻辑。
需要访问本地规则文件、字典、轻量模型或sidecar元数据。
需要将一行输入数据展开为多行输出。
需要将NumPy、PyArrow或自定义推理逻辑接入DataFrame流水线。
开发时建议先用Daft原生表达式完成公共链路,再将最后一段业务特化逻辑编写为UDF。这种方式更容易保持类型清晰,也便于后续切换到分布式模式(RayRunner)。
UDF类型概览
下表列出了本文涵盖的UDF模式。
模式 | 输入粒度 | 输出粒度 | 典型用途 |
| 单行 | 单行 | 逐行解析、格式清洗、轻量规则判断 |
| 单行 | 单行 | I/O密集型逐行数据补充 |
| 单行 | 多行 | 标签展开、切词、事件拆分 |
| 一批( | 一批( | 向量化计算、批量推理、数值处理 |
| 一批( | 一批( | 复用昂贵初始化状态(模型、Tokenizer、规则文件) |
准备工作
安装所需依赖:
pip install "daft[ray]" numpy pyarrow如果只需在本机验证逻辑,直接使用默认的单机模式(Native Runner)即可。如需在Ray集群上运行,请设置以下环境变量:
export DAFT_RUNNER_MODE=ray
export DAFT_RAY_ADDRESS="ray://<head-host>:10001"本文所有脚本均以惰性方式构建DataFrame表达式,只有在调用show()或collect()时才会触发执行。对于@daft.cls UDF,__init__中的昂贵初始化会在查询执行时才运行,而非对象实例化时。
逐行(Row-wise)UDF:逐行解析半结构化事件
以下示例将日志系统中的一列JSON字符串解析为统一的告警级别。适用于单行输入产生单行输出的轻量逻辑。
import json
import os
import daft
from daft import col
def configure_runner():
mode = os.getenv("DAFT_RUNNER_MODE", "native")
if mode == "ray":
daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
else:
daft.set_runner_native()
configure_runner()
@daft.func
def classify_alert(payload: str) -> str:
event = json.loads(payload)
latency_ms = event["latency_ms"]
if latency_ms >= 800:
return "critical"
if latency_ms >= 300:
return "warning"
return "normal"
df = daft.from_pydict(
{
"request_id": ["req-1", "req-2", "req-3"],
"payload": [
'{"latency_ms": 120, "path": "/health"}',
'{"latency_ms": 460, "path": "/embed"}',
'{"latency_ms": 980, "path": "/classify"}',
],
}
)
result = (
df.with_column("alert_level", classify_alert(col("payload")))
.select("request_id", "alert_level")
)
result.show()应用场景:将原始事件流标准化为结构稳定的派生列,便于后续通过where()、groupby()或告警聚合表达式继续处理。
选择逐行模式的原因:逻辑轻量,每行只依赖当前输入,无需引入批处理或状态复用。
异步逐行(Async Row-wise)UDF:并发读取sidecar元数据
以下示例模拟主表与sidecar元数据分离存放的场景。每行只需读取一个本地JSON文件,但整体属于I/O密集型任务,使用异步UDF可以显著提升并发吞吐量。
import asyncio
import json
import os
import tempfile
from pathlib import Path
import daft
from daft import col
def configure_runner():
mode = os.getenv("DAFT_RUNNER_MODE", "native")
if mode == "ray":
daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
else:
daft.set_runner_native()
configure_runner()
workspace = Path(tempfile.mkdtemp(prefix="daft_async_udf_"))
sidecar = {
"doc-1": {"language": "zh", "pages": 12},
"doc-2": {"language": "en", "pages": 4},
"doc-3": {"language": "ja", "pages": 20},
}
for doc_id, payload in sidecar.items():
(workspace / f"{doc_id}.json").write_text(
json.dumps(payload), encoding="utf-8"
)
@daft.func(max_concurrency=8)
async def read_language(doc_id: str) -> str:
path = workspace / f"{doc_id}.json"
raw = await asyncio.to_thread(path.read_text, encoding="utf-8")
return json.loads(raw)["language"]
df = daft.from_pydict({"doc_id": ["doc-1", "doc-2", "doc-3"]})
result = df.with_column("language", read_language(col("doc_id"))).select(
"doc_id", "language"
)
result.show()应用场景:批量补齐OCR结果、元数据标签、外部配置内容,或调用限流可控的内部服务接口。
选择异步逐行模式的原因:每行执行独立I/O,请求之间可以并发。如果使用同步逐行UDF,整体吞吐量会受单次等待时间的限制。
生成器(Generator)UDF:把一行标签展开成多行
以下示例将逗号分隔的标签列展开为每行一个标签,适用于构建倒排索引、统计标签分布或执行下游JSON操作。
import os
from typing import Iterator
import daft
from daft import col
def configure_runner():
mode = os.getenv("DAFT_RUNNER_MODE", "native")
if mode == "ray":
daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
else:
daft.set_runner_native()
configure_runner()
@daft.func
def explode_tags(tags: str) -> Iterator[str]:
for tag in tags.split(","):
value = tag.strip()
if value:
yield value
df = daft.from_pydict(
{
"asset_id": ["img-1", "img-2"],
"tags": ["portrait,indoor,marketing", "video,short-form,highlight"],
}
)
result = df.select("asset_id", explode_tags(col("tags")).alias("tag"))
result.show()应用场景:将“列表塞进一列”的原始数据展开为规范化的明细表,便于后续聚合或JOIN操作。
选择生成器模式的原因:生成器天然适合表达一行变多行的逻辑。需要注意,同一个select()或with_column()操作中只能使用一个生成器UDF。
批量(Batch)UDF:用向量化库批量计算质量分数
以下示例将两个基础指标(brightness和blur)合成为一个综合质量分数。计算逻辑本身不复杂,但批处理模式可以直接调用PyArrow的向量化算子。
import os
import daft
import pyarrow.compute as pc
from daft import DataType, Series, col
def configure_runner():
mode = os.getenv("DAFT_RUNNER_MODE", "native")
if mode == "ray":
daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
else:
daft.set_runner_native()
configure_runner()
@daft.func.batch(return_dtype=DataType.float64(), batch_size=256)
def quality_score(brightness: Series, blur: Series) -> Series:
brightness_arrow = brightness.to_arrow()
blur_arrow = blur.to_arrow()
score = pc.subtract(pc.multiply(brightness_arrow, 0.7), pc.multiply(blur_arrow, 0.3))
return pc.round(score, ndigits=4)
df = daft.from_pydict(
{
"clip_id": ["clip-1", "clip-2", "clip-3"],
"brightness": [0.92, 0.64, 0.48],
"blur": [0.10, 0.22, 0.33],
}
)
result = (
df.with_column(
"quality_score",
quality_score(col("brightness"), col("blur")),
)
.select("clip_id", "quality_score")
)
result.show()应用场景:将整批数值列送入NumPy、PyArrow、向量数据库SDK或批量推理接口。
选择批量模式的原因:一批Series对象可以直接传给向量化库,通常比逐行Python调用更稳定、性能更高。
有状态类(Stateful Class)UDF:复用规则和模型状态,避免重复初始化
以下示例模拟文本审核场景,规则文件在__init__中加载一次,后续每个批次复用同一实例,避免重复读取文件。
import json
import os
import tempfile
from pathlib import Path
import daft
from daft import DataType, Series, col
def configure_runner():
mode = os.getenv("DAFT_RUNNER_MODE", "native")
if mode == "ray":
daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
else:
daft.set_runner_native()
configure_runner()
workspace = Path(tempfile.mkdtemp(prefix="daft_cls_udf_"))
rules_path = workspace / "rules.json"
rules_path.write_text(
json.dumps(
{
"risk": ["password leaked", "credit card", "secret token"],
"review": ["manual review", "appeal", "chargeback"],
}
),
encoding="utf-8",
)
@daft.cls(max_concurrency=2)
class RuleBasedModerator:
def __init__(self, rules_file: str):
self.rules = json.loads(Path(rules_file).read_text(encoding="utf-8"))
@daft.method.batch(return_dtype=DataType.string(), batch_size=64)
def classify(self, texts: Series):
labels = []
for text in texts.to_pylist():
lowered = text.lower()
if any(token in lowered for token in self.rules["risk"]):
labels.append("risk")
elif any(token in lowered for token in self.rules["review"]):
labels.append("review")
else:
labels.append("pass")
return labels
moderator = RuleBasedModerator(str(rules_path))
df = daft.from_pydict(
{
"ticket_id": ["t-1", "t-2", "t-3"],
"content": [
"Customer asks for manual review of a refund.",
"Possible credit card leak found in logs.",
"Normal feedback with no action needed.",
],
}
)
result = (
df.with_column("label", moderator.classify(col("content")))
.select("ticket_id", "label")
)
result.show()应用场景:加载并复用本地模型、Tokenizer、规则表、连接池、词典索引等高初始化成本的状态。
选择@daft.cls的原因:Daft会在执行阶段初始化类UDF,并在每个工作节点上跨批次复用实例。这种模式可以有效避免重复加载模型或配置文件。
多模态UDF:视频抽帧之后做自定义帧质量打分
在生产流水线中,通常先使用daft.read_video_frames(...)完成抽帧,再对每帧应用自定义UDF进行评分。为便于直接运行,以下示例直接构造了帧数据列。
import os
import daft
import numpy as np
from daft import DataType, Series, col
def configure_runner():
mode = os.getenv("DAFT_RUNNER_MODE", "native")
if mode == "ray":
daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
else:
daft.set_runner_native()
configure_runner()
frames = [
np.full((4, 4, 3), 32, dtype=np.uint8).tolist(),
np.full((4, 4, 3), 180, dtype=np.uint8).tolist(),
np.full((4, 4, 3), 245, dtype=np.uint8).tolist(),
]
@daft.func.batch(return_dtype=DataType.float32(), batch_size=8)
def frame_brightness(frames: Series):
scores = []
for frame in frames.to_pylist():
score = float(np.asarray(frame, dtype=np.float32).mean() / 255.0)
scores.append(score)
return scores
df = daft.from_pydict(
{
"clip_id": ["clip-1", "clip-1", "clip-1"],
"frame_index": [0, 1, 2],
"data": frames,
}
)
result = (
df.with_column("brightness", frame_brightness(col("data")))
.select("clip_id", "frame_index", "brightness")
)
result.show()应用场景:在视频抽帧后计算亮度、模糊度、质量分数或候选关键帧分数,以决定哪些帧进入后续Embedding或分类流程。
集成方式:如果您的流水线已使用daft.read_video_frames(...),只需将示例中的from_pydict(...)替换为真实帧表,UDF主体无需修改。
多模态 UDF:对音频文件做轻量质检
如果您的需求仅是建立音频索引、提取元数据或统一采样率,建议优先使用Daft原生音频函数。当需要业务侧质检、规则打分或自定义特征提取时,再通过UDF补充最后一段逻辑。以下脚本会先生成两个本地WAV文件,再批量计算RMS能量。
import math
import os
import tempfile
import wave
from pathlib import Path
import daft
import numpy as np
from daft import DataType, Series, col
def configure_runner():
mode = os.getenv("DAFT_RUNNER_MODE", "native")
if mode == "ray":
daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
else:
daft.set_runner_native()
configure_runner()
workspace = Path(tempfile.mkdtemp(prefix="daft_audio_udf_"))
def write_tone(path: Path, freq_hz: float, duration_sec: float):
sample_rate = 16000
t = np.linspace(0, duration_sec, int(sample_rate * duration_sec), endpoint=False)
samples = (0.3 * np.sin(2 * math.pi * freq_hz * t) * 32767).astype(np.int16)
with wave.open(str(path), "wb") as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
wav_file.writeframes(samples.tobytes())
audio_a = workspace / "tone_a.wav"
audio_b = workspace / "tone_b.wav"
write_tone(audio_a, freq_hz=220.0, duration_sec=0.5)
write_tone(audio_b, freq_hz=880.0, duration_sec=0.5)
@daft.func.batch(return_dtype=DataType.float64(), batch_size=16)
def rms_energy(paths: Series):
outputs = []
for path in paths.to_pylist():
with wave.open(path, "rb") as wav_file:
samples = np.frombuffer(wav_file.readframes(wav_file.getnframes()), dtype=np.int16)
outputs.append(float(np.sqrt(np.mean(np.square(samples.astype(np.float64))))))
return outputs
df = daft.from_pydict(
{
"audio_id": ["audio-a", "audio-b"],
"path": [str(audio_a), str(audio_b)],
}
)
result = (
df.with_column("rms_energy", rms_energy(col("path")))
.select("audio_id", "rms_energy")
)
result.show()应用场景:筛除静音音频、检测录音异常,或在语音识别和说话人分离之前添加轻量质检环节。
设计思路:对于音频文件处理,将文件路径纳入DataFrame,再通过UDF实现业务侧特征提取,是一种实用且稳定的方式。
最佳实践
优先使用Daft原生表达式和多模态算子完成公共链路,仅对业务特化逻辑编写UDF。
轻量单行逻辑优先选择逐行UDF,I/O密集型逐行数据补充优先选择异步逐行UDF。
需要将一行展开为多行时使用生成器UDF。同一个
select()或with_column()操作中只能使用一个生成器UDF。需要接入NumPy、PyArrow、批量推理或向量化计算时,优先选择
@daft.func.batch。需要复用模型、Tokenizer、规则文件、连接池等昂贵状态时,优先选择
@daft.cls。对于音视频等大对象列,不要盲目增大
batch_size。应根据可用内存、GPU显存和输入数据大小进行实测调整,避免OOM(内存溢出)。先在单机模式(Native Runner)下验证返回类型、空值处理和Schema,再切换到分布式模式(Ray Runner)进行横向扩展。
在Ray集群上运行时,确保UDF访问的所有本地文件、规则文件或sidecar路径对工作节点可见。