自定义UDF开发指南

更新时间:
复制为 MD 格式

Daft已内置丰富的表达式和多模态算子,可以覆盖大多数常见处理需求。当内置算子无法满足业务逻辑时,您可以编写UDF(自定义函数),将自定义的逐行处理、批量处理或有状态处理逻辑接入DataFrame流水线。

什么时候需要自定义UDF

Daft内置了大量表达式和多模态算子。建议仅在以下场景编写自定义UDF:

  • 内置算子无法覆盖您的业务逻辑。

  • 需要访问本地规则文件、字典、轻量模型或sidecar元数据。

  • 需要将一行输入数据展开为多行输出。

  • 需要将NumPy、PyArrow或自定义推理逻辑接入DataFrame流水线。

开发时建议先用Daft原生表达式完成公共链路,再将最后一段业务特化逻辑编写为UDF。这种方式更容易保持类型清晰,也便于后续切换到分布式模式(RayRunner)。

UDF类型概览

下表列出了本文涵盖的UDF模式。

模式

输入粒度

输出粒度

典型用途

@daft.func

单行

单行

逐行解析、格式清洗、轻量规则判断

@daft.func+async def

单行

单行

I/O密集型逐行数据补充

@daft.func+yield

单行

多行

标签展开、切词、事件拆分

@daft.func.batch

一批(Series

一批(Series

向量化计算、批量推理、数值处理

@daft.cls+@daft.method.batch

一批(Series

一批(Series

复用昂贵初始化状态(模型、Tokenizer、规则文件)

准备工作

安装所需依赖:

pip install "daft[ray]" numpy pyarrow

如果只需在本机验证逻辑,直接使用默认的单机模式(Native Runner)即可。如需在Ray集群上运行,请设置以下环境变量:

export DAFT_RUNNER_MODE=ray
export DAFT_RAY_ADDRESS="ray://<head-host>:10001"

本文所有脚本均以惰性方式构建DataFrame表达式,只有在调用show()collect()时才会触发执行。对于@daft.cls UDF,__init__中的昂贵初始化会在查询执行时才运行,而非对象实例化时。

逐行(Row-wise)UDF:逐行解析半结构化事件

以下示例将日志系统中的一列JSON字符串解析为统一的告警级别。适用于单行输入产生单行输出的轻量逻辑。

import json
import os

import daft
from daft import col


def configure_runner():
    mode = os.getenv("DAFT_RUNNER_MODE", "native")
    if mode == "ray":
        daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
    else:
        daft.set_runner_native()


configure_runner()


@daft.func
def classify_alert(payload: str) -> str:
    event = json.loads(payload)
    latency_ms = event["latency_ms"]
    if latency_ms >= 800:
        return "critical"
    if latency_ms >= 300:
        return "warning"
    return "normal"


df = daft.from_pydict(
    {
        "request_id": ["req-1", "req-2", "req-3"],
        "payload": [
            '{"latency_ms": 120, "path": "/health"}',
            '{"latency_ms": 460, "path": "/embed"}',
            '{"latency_ms": 980, "path": "/classify"}',
        ],
    }
)

result = (
    df.with_column("alert_level", classify_alert(col("payload")))
    .select("request_id", "alert_level")
)

result.show()

应用场景:将原始事件流标准化为结构稳定的派生列,便于后续通过where()groupby()或告警聚合表达式继续处理。

选择逐行模式的原因:逻辑轻量,每行只依赖当前输入,无需引入批处理或状态复用。

异步逐行(Async Row-wise)UDF:并发读取sidecar元数据

以下示例模拟主表与sidecar元数据分离存放的场景。每行只需读取一个本地JSON文件,但整体属于I/O密集型任务,使用异步UDF可以显著提升并发吞吐量。

import asyncio
import json
import os
import tempfile
from pathlib import Path

import daft
from daft import col


def configure_runner():
    mode = os.getenv("DAFT_RUNNER_MODE", "native")
    if mode == "ray":
        daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
    else:
        daft.set_runner_native()


configure_runner()

workspace = Path(tempfile.mkdtemp(prefix="daft_async_udf_"))
sidecar = {
    "doc-1": {"language": "zh", "pages": 12},
    "doc-2": {"language": "en", "pages": 4},
    "doc-3": {"language": "ja", "pages": 20},
}

for doc_id, payload in sidecar.items():
    (workspace / f"{doc_id}.json").write_text(
        json.dumps(payload), encoding="utf-8"
    )


@daft.func(max_concurrency=8)
async def read_language(doc_id: str) -> str:
    path = workspace / f"{doc_id}.json"
    raw = await asyncio.to_thread(path.read_text, encoding="utf-8")
    return json.loads(raw)["language"]


df = daft.from_pydict({"doc_id": ["doc-1", "doc-2", "doc-3"]})

result = df.with_column("language", read_language(col("doc_id"))).select(
    "doc_id", "language"
)

result.show()

应用场景:批量补齐OCR结果、元数据标签、外部配置内容,或调用限流可控的内部服务接口。

选择异步逐行模式的原因:每行执行独立I/O,请求之间可以并发。如果使用同步逐行UDF,整体吞吐量会受单次等待时间的限制。

生成器(Generator)UDF:把一行标签展开成多行

以下示例将逗号分隔的标签列展开为每行一个标签,适用于构建倒排索引、统计标签分布或执行下游JSON操作。

import os
from typing import Iterator

import daft
from daft import col


def configure_runner():
    mode = os.getenv("DAFT_RUNNER_MODE", "native")
    if mode == "ray":
        daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
    else:
        daft.set_runner_native()


configure_runner()


@daft.func
def explode_tags(tags: str) -> Iterator[str]:
    for tag in tags.split(","):
        value = tag.strip()
        if value:
            yield value


df = daft.from_pydict(
    {
        "asset_id": ["img-1", "img-2"],
        "tags": ["portrait,indoor,marketing", "video,short-form,highlight"],
    }
)

result = df.select("asset_id", explode_tags(col("tags")).alias("tag"))

result.show()

应用场景:将“列表塞进一列”的原始数据展开为规范化的明细表,便于后续聚合或JOIN操作。

选择生成器模式的原因:生成器天然适合表达一行变多行的逻辑。需要注意,同一个select()with_column()操作中只能使用一个生成器UDF。

批量(Batch)UDF:用向量化库批量计算质量分数

以下示例将两个基础指标(brightnessblur)合成为一个综合质量分数。计算逻辑本身不复杂,但批处理模式可以直接调用PyArrow的向量化算子。

import os

import daft
import pyarrow.compute as pc
from daft import DataType, Series, col


def configure_runner():
    mode = os.getenv("DAFT_RUNNER_MODE", "native")
    if mode == "ray":
        daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
    else:
        daft.set_runner_native()


configure_runner()


@daft.func.batch(return_dtype=DataType.float64(), batch_size=256)
def quality_score(brightness: Series, blur: Series) -> Series:
    brightness_arrow = brightness.to_arrow()
    blur_arrow = blur.to_arrow()
    score = pc.subtract(pc.multiply(brightness_arrow, 0.7), pc.multiply(blur_arrow, 0.3))
    return pc.round(score, ndigits=4)


df = daft.from_pydict(
    {
        "clip_id": ["clip-1", "clip-2", "clip-3"],
        "brightness": [0.92, 0.64, 0.48],
        "blur": [0.10, 0.22, 0.33],
    }
)

result = (
    df.with_column(
        "quality_score",
        quality_score(col("brightness"), col("blur")),
    )
    .select("clip_id", "quality_score")
)

result.show()

应用场景:将整批数值列送入NumPy、PyArrow、向量数据库SDK或批量推理接口。

选择批量模式的原因:一批Series对象可以直接传给向量化库,通常比逐行Python调用更稳定、性能更高。

有状态类(Stateful Class)UDF:复用规则和模型状态,避免重复初始化

以下示例模拟文本审核场景,规则文件在__init__中加载一次,后续每个批次复用同一实例,避免重复读取文件。

import json
import os
import tempfile
from pathlib import Path

import daft
from daft import DataType, Series, col


def configure_runner():
    mode = os.getenv("DAFT_RUNNER_MODE", "native")
    if mode == "ray":
        daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
    else:
        daft.set_runner_native()


configure_runner()

workspace = Path(tempfile.mkdtemp(prefix="daft_cls_udf_"))
rules_path = workspace / "rules.json"
rules_path.write_text(
    json.dumps(
        {
            "risk": ["password leaked", "credit card", "secret token"],
            "review": ["manual review", "appeal", "chargeback"],
        }
    ),
    encoding="utf-8",
)


@daft.cls(max_concurrency=2)
class RuleBasedModerator:
    def __init__(self, rules_file: str):
        self.rules = json.loads(Path(rules_file).read_text(encoding="utf-8"))

    @daft.method.batch(return_dtype=DataType.string(), batch_size=64)
    def classify(self, texts: Series):
        labels = []
        for text in texts.to_pylist():
            lowered = text.lower()
            if any(token in lowered for token in self.rules["risk"]):
                labels.append("risk")
            elif any(token in lowered for token in self.rules["review"]):
                labels.append("review")
            else:
                labels.append("pass")
        return labels


moderator = RuleBasedModerator(str(rules_path))

df = daft.from_pydict(
    {
        "ticket_id": ["t-1", "t-2", "t-3"],
        "content": [
            "Customer asks for manual review of a refund.",
            "Possible credit card leak found in logs.",
            "Normal feedback with no action needed.",
        ],
    }
)

result = (
    df.with_column("label", moderator.classify(col("content")))
    .select("ticket_id", "label")
)

result.show()

应用场景:加载并复用本地模型、Tokenizer、规则表、连接池、词典索引等高初始化成本的状态。

选择@daft.cls的原因:Daft会在执行阶段初始化类UDF,并在每个工作节点上跨批次复用实例。这种模式可以有效避免重复加载模型或配置文件。

多模态UDF:视频抽帧之后做自定义帧质量打分

在生产流水线中,通常先使用daft.read_video_frames(...)完成抽帧,再对每帧应用自定义UDF进行评分。为便于直接运行,以下示例直接构造了帧数据列。

import os

import daft
import numpy as np
from daft import DataType, Series, col


def configure_runner():
    mode = os.getenv("DAFT_RUNNER_MODE", "native")
    if mode == "ray":
        daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
    else:
        daft.set_runner_native()


configure_runner()

frames = [
    np.full((4, 4, 3), 32, dtype=np.uint8).tolist(),
    np.full((4, 4, 3), 180, dtype=np.uint8).tolist(),
    np.full((4, 4, 3), 245, dtype=np.uint8).tolist(),
]


@daft.func.batch(return_dtype=DataType.float32(), batch_size=8)
def frame_brightness(frames: Series):
    scores = []
    for frame in frames.to_pylist():
        score = float(np.asarray(frame, dtype=np.float32).mean() / 255.0)
        scores.append(score)
    return scores


df = daft.from_pydict(
    {
        "clip_id": ["clip-1", "clip-1", "clip-1"],
        "frame_index": [0, 1, 2],
        "data": frames,
    }
)

result = (
    df.with_column("brightness", frame_brightness(col("data")))
    .select("clip_id", "frame_index", "brightness")
)

result.show()

应用场景:在视频抽帧后计算亮度、模糊度、质量分数或候选关键帧分数,以决定哪些帧进入后续Embedding或分类流程。

集成方式:如果您的流水线已使用daft.read_video_frames(...),只需将示例中的from_pydict(...)替换为真实帧表,UDF主体无需修改。

多模态 UDF:对音频文件做轻量质检

如果您的需求仅是建立音频索引、提取元数据或统一采样率,建议优先使用Daft原生音频函数。当需要业务侧质检、规则打分或自定义特征提取时,再通过UDF补充最后一段逻辑。以下脚本会先生成两个本地WAV文件,再批量计算RMS能量。

import math
import os
import tempfile
import wave
from pathlib import Path

import daft
import numpy as np
from daft import DataType, Series, col


def configure_runner():
    mode = os.getenv("DAFT_RUNNER_MODE", "native")
    if mode == "ray":
        daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
    else:
        daft.set_runner_native()


configure_runner()

workspace = Path(tempfile.mkdtemp(prefix="daft_audio_udf_"))


def write_tone(path: Path, freq_hz: float, duration_sec: float):
    sample_rate = 16000
    t = np.linspace(0, duration_sec, int(sample_rate * duration_sec), endpoint=False)
    samples = (0.3 * np.sin(2 * math.pi * freq_hz * t) * 32767).astype(np.int16)
    with wave.open(str(path), "wb") as wav_file:
        wav_file.setnchannels(1)
        wav_file.setsampwidth(2)
        wav_file.setframerate(sample_rate)
        wav_file.writeframes(samples.tobytes())


audio_a = workspace / "tone_a.wav"
audio_b = workspace / "tone_b.wav"
write_tone(audio_a, freq_hz=220.0, duration_sec=0.5)
write_tone(audio_b, freq_hz=880.0, duration_sec=0.5)


@daft.func.batch(return_dtype=DataType.float64(), batch_size=16)
def rms_energy(paths: Series):
    outputs = []
    for path in paths.to_pylist():
        with wave.open(path, "rb") as wav_file:
            samples = np.frombuffer(wav_file.readframes(wav_file.getnframes()), dtype=np.int16)
        outputs.append(float(np.sqrt(np.mean(np.square(samples.astype(np.float64))))))
    return outputs


df = daft.from_pydict(
    {
        "audio_id": ["audio-a", "audio-b"],
        "path": [str(audio_a), str(audio_b)],
    }
)

result = (
    df.with_column("rms_energy", rms_energy(col("path")))
    .select("audio_id", "rms_energy")
)

result.show()

应用场景:筛除静音音频、检测录音异常,或在语音识别和说话人分离之前添加轻量质检环节。

设计思路:对于音频文件处理,将文件路径纳入DataFrame,再通过UDF实现业务侧特征提取,是一种实用且稳定的方式。

最佳实践

  • 优先使用Daft原生表达式和多模态算子完成公共链路,仅对业务特化逻辑编写UDF。

  • 轻量单行逻辑优先选择逐行UDF,I/O密集型逐行数据补充优先选择异步逐行UDF。

  • 需要将一行展开为多行时使用生成器UDF。同一个select()with_column()操作中只能使用一个生成器UDF。

  • 需要接入NumPy、PyArrow、批量推理或向量化计算时,优先选择@daft.func.batch

  • 需要复用模型、Tokenizer、规则文件、连接池等昂贵状态时,优先选择@daft.cls

  • 对于音视频等大对象列,不要盲目增大batch_size。应根据可用内存、GPU显存和输入数据大小进行实测调整,避免OOM(内存溢出)。

  • 先在单机模式(Native Runner)下验证返回类型、空值处理和Schema,再切换到分布式模式(Ray Runner)进行横向扩展。

  • Ray集群上运行时,确保UDF访问的所有本地文件、规则文件或sidecar路径对工作节点可见。