多模态AI数据处理通常会在同一条流水线中混合CPU算子(下载、解码、抽帧、缩放、清洗)和GPU算子(Embedding、分类、推理)。Daft on Ray支持在单个算子级别声明资源需求,使CPU预处理和GPU推理能够共存于同一条DataFrame流水线中,无需进行粗粒度的资源分配。
粗粒度调度的问题
如果把整条多模态流水线都当成一个GPU作业来提交,会出现两个常见问题:
GPU被CPU密集型任务浪费。
download()、decode_image()、resize()这类步骤本身不需要GPU,但当资源声明粒度过粗时,它们会被连同GPU资源一起调度。这意味着昂贵的GPU时间被消耗在完全运行于CPU上的I/O和像素操作上。CPU和GPU互相阻塞。真正需要GPU的模型推理通常只占流水线的一小段。当资源边界过于宽泛时,CPU和GPU很容易互相等待,导致双方利用率都偏低。
在传统批处理框架中,开发者通常的解决方式是将流水线拆分为两个独立程序:一个CPU作业负责预处理,一个GPU作业负责推理。这种做法引入了中间存储、同步开销和重复的调度逻辑。
Daft on Ray的做法不是把任务拆成“CPU版代码”和“GPU版代码”,而是让资源声明跟着具体算子走。在同一条DataFrame流水线中,CPU预处理继续运行在CPU上,只有真正需要加速的类UDF才请求GPU。Swordfish执行引擎会将每个算子调度到合适的资源池上,以流式morsel驱动的方式重叠CPU和GPU工作。
资源标注API
Daft提供了Python装饰器,将资源需求直接绑定到单个算子上。用于异构调度的两个核心装饰器是@daft.cls和@daft.method.batch。
@daft.cls:将GPU资源绑定到有状态类
@daft.cls(gpus=N, max_concurrency=M)
class MyModel:
def __init__(self):
# 加载模型权重;每个执行实例只运行一次
...
@daft.method.batch(return_dtype=DataType.float32(), batch_size=32)
def predict(self, inputs: Series):
# 处理一个批次的行
...关键参数
gpus=N:该算子每个执行实例需要的GPU数量。max_concurrency=M:限制单个实例最多并发处理的批次数量。该参数用于控制吞吐量和GPU显存占用。数值越大吞吐越高,但也会增加显存压力。
__init__方法在每个执行实例中只运行一次,应在此处加载模型权重并将其移动到GPU上。Daft不会在类UDF注册时调用__init__,初始化会延迟到查询真正执行时才进行。
@daft.method.batch:配置批处理
@daft.method.batch(
return_dtype=DataType.embedding(DataType.float32(), 3),
batch_size=32,
)
def embed(self, images: Series):
...关键参数
return_dtype:输出列的Daft数据类型。必须与实际返回值匹配,不匹配会导致运行时错误。batch_size:每次方法调用传入的行数。较大的批次可以通过分摊内核启动开销来提升GPU吞吐,但需要更多GPU显存。建议从32开始,根据模型和可用显存进行调整。
纯CPU算子
Daft内置表达式如download()、decode_image()和resize()都是原生CPU算子。它们不携带任何GPU资源标注,会被自动调度到CPU Worker上执行。您也可以使用@daft.func(不带gpus参数)编写纯CPU的自定义UDF,将自定义CPU逻辑加入同一条流水线。
可运行的完整样例
以下样例演示了从下载图片、在CPU上预处理到在GPU上计算平均颜色Embedding的完整流程。这个样例的目的是展示异构算子的声明和执行方式,而非模型本身。
准备环境
准备一个安装了Daft和PyTorch的Ray环境:
pip install "daft[ray]" torch
ray start --head完整脚本
将以下脚本保存为app.py并运行:
import os
import daft
from daft import DataType, Series, col
daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))
GPU_PER_WORKER = int(os.getenv("DAFT_GPU_PER_WORKER", "0"))
@daft.cls(gpus=GPU_PER_WORKER, max_concurrency=2)
class MeanColorEmbedder:
def __init__(self):
import torch
self.torch = torch
use_gpu = GPU_PER_WORKER > 0 and torch.cuda.is_available()
self.device = torch.device("cuda" if use_gpu else "cpu")
@daft.method.batch(
return_dtype=DataType.embedding(DataType.float32(), 3),
batch_size=32,
)
def embed(self, images: Series):
outputs = []
for image in images.to_pylist():
if image is None:
outputs.append(None)
continue
tensor = self.torch.tensor(
image, dtype=self.torch.float32, device=self.device
)
mean_rgb = tensor.mean(dim=(0, 1)) / 255.0
outputs.append(mean_rgb.cpu().tolist())
return outputs
urls = [
"https://xxx.com/65535/53671838774_xxx_o.jpg",
"https://xxx.com/65535/53671700073_xxx_o.jpg",
]
embedder = MeanColorEmbedder()
df = (
daft.from_pydict({"url": urls})
.with_column(
"image",
col("url").download(on_error="null").decode_image().resize(224, 224),
)
.with_column("embedding", embedder.embed(col("image")))
.select("url", "embedding")
)
df.show()运行样例
在没有GPU的机器上,先走CPU验证路径:
DAFT_GPU_PER_WORKER=0 python app.py如果当前Ray节点可以分配GPU,再打开GPU资源请求:
DAFT_GPU_PER_WORKER=1 python app.py同一份代码在两种模式下都可以运行。这种设计让您可以在纯CPU机器上先验证Schema、空值处理和返回类型,然后再部署到GPU集群上。
执行流程
当df.show()触发执行后,流水线将经历以下四个阶段。理解这一流程对调试和性能调优至关重要。
步骤一:选择执行后端
daft.set_runner_ray(...)的作用是把这份DataFrame流水线交给Ray Runner执行。此时还没有开始下载图片,也没有启动推理。后面的with_column()和select()仍然只是在构建表达式和逻辑计划。
如果没有显式传入DAFT_RAY_ADDRESS,Daft会连接到可用的本地Ray实例。这让同一份代码既能在开发机上验证,也能切到已有的Ray集群上运行。
步骤二:将GPU需求绑定到具体算子
MeanColorEmbedder不是普通的Python helper class,而是一个Daft类UDF:
@daft.cls(gpus=GPU_PER_WORKER, max_concurrency=2)
class MeanColorEmbedder:
...这里有两个控制资源分配的关键参数:
gpus=GPU_PER_WORKER:指定该算子每个执行实例需要的GPU数量。max_concurrency=2:限制单个实例最多并发处理的批次数量。该参数控制吞吐和GPU显存占用。增大该值可以让GPU并行处理更多批次,但也会增加峰值显存消耗。
embedder = MeanColorEmbedder()这一步不会立刻加载模型或占用GPU资源。Daft会先记录类UDF的配置,等查询真正执行时,再在对应的执行实例中完成初始化。
步骤三:在同一条流水线中混合CPU和GPU算子
以下代码将CPU侧预处理和GPU类UDF组合在同一条流水线中:
df = (
daft.from_pydict({"url": urls})
.with_column(
"image",
col("url").download(on_error="null").decode_image().resize(224, 224),
)
.with_column("embedding", embedder.embed(col("image")))
.select("url", "embedding")
)这段流水线可以按任务流拆解为五个逻辑步骤:
from_pydict():创建输入表,将图片URL放入url列。download(on_error="null"):在CPU侧拉取图片内容。下载失败时生成null值,而非抛出异常。decode_image().resize(224, 224):在CPU侧完成解码和尺寸规范化,生成image列。embedder.embed(col("image")):将image列按批次送入类UDF。这是流水线中唯一需要GPU资源的步骤。类UDF返回
embedding列,select()只保留需要输出的列。
关键点在于,CPU算子和GPU算子不需要拆成两段完全独立的程序。它们可以继续留在同一条DataFrame流水线中,各自声明不同的资源需求,并以流水线方式执行。Swordfish执行引擎会自动处理CPU算子与GPU算子之间的调度和数据传输。
步骤四:df.show()触发执行
Daft的DataFrame API默认是惰性的。前面定义的流水线只有在调用df.show()或collect()等终端操作时才会真正执行。
当df.show()触发执行后,流水线按以下顺序进行:
Ray Runner读取
url列中的输入数据。CPU侧先完成图片下载、解码和缩放。
处理好的图片按
batch_size=32组成批次。embed()方法在满足GPU资源声明的执行实例上处理这些批次。如果
DAFT_GPU_PER_WORKER=1,类UDF会请求 GPU;如果设为0,同一段逻辑仍然可以在CPU上验证。show()输出最终的url和embedding结果。
对开发者来说,这个执行模型非常重要:先用DAFT_GPU_PER_WORKER=0验证Schema、空值处理和返回类型,再打开GPU资源请求检查异构调度是否符合预期。这种分阶段的方法可以显著降低调试成本,因为数据类型和逻辑错误在廉价的CPU资源上就能被发现,无需消耗GPU时间。
与Spark的对比
以下对比只讨Daft on Ray与Spark在资源粒度和任务组织方式上的差异,不评价整体平台能力。
维度 | Daft on Ray | Spark |
资源声明粒度 | 算子级别。通过 | 作业级别或Stage级别。 |
GPU绑定机制 | 原生装饰器( | 通过 |
流水线执行模型 | 流式morsel驱动执行。CPU和GPU算子在同一条流水线中重叠运行,各阶段之间支持背压传播。 | 批处理Stage执行。Stage边界由Shuffle依赖决定,CPU和GPU Stage通常按顺序运行。 |
代码组织方式 | 一份DataFrame代码中同时表达CPU预处理和GPU推理。 | 更常见的做法是将CPU ETL、特征处理和GPU推理拆分为不同Stage,甚至不同的作业。 |
这并不是说Spark不能使用GPU。Spark也支持通过ResourceProfile为Stage申请GPU资源。但两者的核心区别在于资源绑定点不同:
在Daft中,GPU请求可以直接绑定到
embedder.embed(...)这样的具体类UDF算子上。同一流水线中的其他算子继续在CPU Worker上运行,不会请求GPU资源。在Spark中,资源边界通常更靠近Stage级别,而非DataFrame表达式中的单个算子。GPU Stage中的所有任务都获得相同的资源分配,即使其中一些任务只执行CPU工作。
因此,当一条多模态流水线中只有少数算子需要GPU时,Daft更容易把CPU预处理和GPU推理写在同一份DataFrame代码中,并将调度粒度落到真正需要加速的那一段。
开发建议
先在CPU上验证:先用
DAFT_GPU_PER_WORKER=0运行样例,确认return_dtype、空值处理和输出Schema都正确。这样可以在不消耗GPU资源的情况下发现数据类型和逻辑错误。启用GPU并验证调度:然后将
DAFT_GPU_PER_WORKER设为1,验证类UDF的资源请求与可用节点资源是否匹配。检查Ray Runner是否正确地将带GPU标注的算子路由到GPU节点。调优
batch_size和max_concurrency:这两个参数直接影响GPU利用率和显存消耗。建议从batch_size=32和max_concurrency=2开始,然后增大batch_size以提升吞吐,或减小max_concurrency以降低峰值GPU显存占用。最佳值取决于输入大小、模型大小和可用GPU显存。保持算子级别的资源声明:如果同一条流水线中还有其他Python类UDF,应继续在算子级别声明资源,而不是将整条流水线统一视为GPU作业。这样可以确保纯CPU算子不会被强制调度到GPU Worker 上。
结合Ray Runner使用多节点GPU集群:在生产工作负载中,使用
daft.set_runner_ray(address)连接到多节点Ray集群。Ray Runner会将带GPU标注的算子分发到所有可用的GPU节点上,而CPU算子则在任何有可用CPU资源的节点上运行。监控 GPU 利用率:使用Ray Dashboard监控各节点的GPU利用率。如果GPU利用率持续偏低,可以考虑增大
batch_size或max_concurrency。如果出现OOM错误,则应降低这些值,或减小gpus参数以使用分数GPU资源。