CPU/GPU 异构算子编排和调度

更新时间:
复制为 MD 格式

多模态AI数据处理通常会在同一条流水线中混合CPU算子(下载、解码、抽帧、缩放、清洗)和GPU算子(Embedding、分类、推理)。Daft on Ray支持在单个算子级别声明资源需求,使CPU预处理和GPU推理能够共存于同一条DataFrame流水线中,无需进行粗粒度的资源分配。

粗粒度调度的问题

如果把整条多模态流水线都当成一个GPU作业来提交,会出现两个常见问题:

  1. GPUCPU密集型任务浪费。download()decode_image()resize()这类步骤本身不需要GPU,但当资源声明粒度过粗时,它们会被连同GPU资源一起调度。这意味着昂贵的GPU时间被消耗在完全运行于CPU上的I/O和像素操作上。

  2. CPUGPU互相阻塞。真正需要GPU的模型推理通常只占流水线的一小段。当资源边界过于宽泛时,CPUGPU很容易互相等待,导致双方利用率都偏低。

在传统批处理框架中,开发者通常的解决方式是将流水线拆分为两个独立程序:一个CPU作业负责预处理,一个GPU作业负责推理。这种做法引入了中间存储、同步开销和重复的调度逻辑。

Daft on Ray的做法不是把任务拆成“CPU版代码”和“GPU版代码”,而是让资源声明跟着具体算子走。在同一条DataFrame流水线中,CPU预处理继续运行在CPU上,只有真正需要加速的类UDF才请求GPU。Swordfish执行引擎会将每个算子调度到合适的资源池上,以流式morsel驱动的方式重叠CPUGPU工作。

资源标注API

Daft提供了Python装饰器,将资源需求直接绑定到单个算子上。用于异构调度的两个核心装饰器是@daft.cls@daft.method.batch

@daft.cls:将GPU资源绑定到有状态类

@daft.cls(gpus=N, max_concurrency=M)
class MyModel:
    def __init__(self):
        # 加载模型权重;每个执行实例只运行一次
        ...

    @daft.method.batch(return_dtype=DataType.float32(), batch_size=32)
    def predict(self, inputs: Series):
        # 处理一个批次的行
        ...

关键参数

  • gpus=N:该算子每个执行实例需要的GPU数量。

  • max_concurrency=M:限制单个实例最多并发处理的批次数量。该参数用于控制吞吐量和GPU显存占用。数值越大吞吐越高,但也会增加显存压力。

__init__方法在每个执行实例中只运行一次,应在此处加载模型权重并将其移动到GPU上。Daft不会在类UDF注册时调用__init__,初始化会延迟到查询真正执行时才进行。

@daft.method.batch:配置批处理

@daft.method.batch(
    return_dtype=DataType.embedding(DataType.float32(), 3),
    batch_size=32,
)
def embed(self, images: Series):
    ...

关键参数

  • return_dtype:输出列的Daft数据类型。必须与实际返回值匹配,不匹配会导致运行时错误。

  • batch_size:每次方法调用传入的行数。较大的批次可以通过分摊内核启动开销来提升GPU吞吐,但需要更多GPU显存。建议从32开始,根据模型和可用显存进行调整。

CPU算子

Daft内置表达式如download()decode_image()resize()都是原生CPU算子。它们不携带任何GPU资源标注,会被自动调度到CPU Worker上执行。您也可以使用@daft.func(不带gpus参数)编写纯CPU的自定义UDF,将自定义CPU逻辑加入同一条流水线。

可运行的完整样例

以下样例演示了从下载图片、在CPU上预处理到在GPU上计算平均颜色Embedding的完整流程。这个样例的目的是展示异构算子的声明和执行方式,而非模型本身。

准备环境

准备一个安装了DaftPyTorchRay环境:

pip install "daft[ray]" torch
ray start --head

完整脚本

将以下脚本保存为app.py并运行:

import os

import daft
from daft import DataType, Series, col


daft.set_runner_ray(os.getenv("DAFT_RAY_ADDRESS"))

GPU_PER_WORKER = int(os.getenv("DAFT_GPU_PER_WORKER", "0"))


@daft.cls(gpus=GPU_PER_WORKER, max_concurrency=2)
class MeanColorEmbedder:
    def __init__(self):
        import torch

        self.torch = torch
        use_gpu = GPU_PER_WORKER > 0 and torch.cuda.is_available()
        self.device = torch.device("cuda" if use_gpu else "cpu")

    @daft.method.batch(
        return_dtype=DataType.embedding(DataType.float32(), 3),
        batch_size=32,
    )
    def embed(self, images: Series):
        outputs = []
        for image in images.to_pylist():
            if image is None:
                outputs.append(None)
                continue

            tensor = self.torch.tensor(
                image, dtype=self.torch.float32, device=self.device
            )
            mean_rgb = tensor.mean(dim=(0, 1)) / 255.0
            outputs.append(mean_rgb.cpu().tolist())
        return outputs


urls = [
    "https://xxx.com/65535/53671838774_xxx_o.jpg",
    "https://xxx.com/65535/53671700073_xxx_o.jpg",
]

embedder = MeanColorEmbedder()

df = (
    daft.from_pydict({"url": urls})
    .with_column(
        "image",
        col("url").download(on_error="null").decode_image().resize(224, 224),
    )
    .with_column("embedding", embedder.embed(col("image")))
    .select("url", "embedding")
)

df.show()

运行样例

在没有GPU的机器上,先走CPU验证路径:

DAFT_GPU_PER_WORKER=0 python app.py

如果当前Ray节点可以分配GPU,再打开GPU资源请求:

DAFT_GPU_PER_WORKER=1 python app.py

同一份代码在两种模式下都可以运行。这种设计让您可以在纯CPU机器上先验证Schema、空值处理和返回类型,然后再部署到GPU集群上。

执行流程

df.show()触发执行后,流水线将经历以下四个阶段。理解这一流程对调试和性能调优至关重要。

步骤一:选择执行后端

daft.set_runner_ray(...)的作用是把这份DataFrame流水线交给Ray Runner执行。此时还没有开始下载图片,也没有启动推理。后面的with_column()select()仍然只是在构建表达式和逻辑计划。

如果没有显式传入DAFT_RAY_ADDRESS,Daft会连接到可用的本地Ray实例。这让同一份代码既能在开发机上验证,也能切到已有的Ray集群上运行。

步骤二:将GPU需求绑定到具体算子

MeanColorEmbedder不是普通的Python helper class,而是一个DaftUDF:

@daft.cls(gpus=GPU_PER_WORKER, max_concurrency=2)
class MeanColorEmbedder:
    ...

这里有两个控制资源分配的关键参数:

  • gpus=GPU_PER_WORKER:指定该算子每个执行实例需要的GPU数量。

  • max_concurrency=2:限制单个实例最多并发处理的批次数量。该参数控制吞吐和GPU显存占用。增大该值可以让GPU并行处理更多批次,但也会增加峰值显存消耗。

embedder = MeanColorEmbedder()这一步不会立刻加载模型或占用GPU资源。Daft会先记录类UDF的配置,等查询真正执行时,再在对应的执行实例中完成初始化。

步骤三:在同一条流水线中混合CPUGPU算子

以下代码将CPU侧预处理和GPUUDF组合在同一条流水线中:

df = (
    daft.from_pydict({"url": urls})
    .with_column(
        "image",
        col("url").download(on_error="null").decode_image().resize(224, 224),
    )
    .with_column("embedding", embedder.embed(col("image")))
    .select("url", "embedding")
)

这段流水线可以按任务流拆解为五个逻辑步骤:

  1. from_pydict():创建输入表,将图片URL放入url列。

  2. download(on_error="null"):在CPU侧拉取图片内容。下载失败时生成null值,而非抛出异常。

  3. decode_image().resize(224, 224):在CPU侧完成解码和尺寸规范化,生成image列。

  4. embedder.embed(col("image")):将image列按批次送入类UDF。这是流水线中唯一需要GPU资源的步骤。

  5. UDF返回embedding列,select()只保留需要输出的列。

关键点在于,CPU算子和GPU算子不需要拆成两段完全独立的程序。它们可以继续留在同一条DataFrame流水线中,各自声明不同的资源需求,并以流水线方式执行。Swordfish执行引擎会自动处理CPU算子与GPU算子之间的调度和数据传输。

步骤四:df.show()触发执行

DaftDataFrame API默认是惰性的。前面定义的流水线只有在调用df.show()collect()等终端操作时才会真正执行。

df.show()触发执行后,流水线按以下顺序进行:

  1. Ray Runner读取url列中的输入数据。

  2. CPU侧先完成图片下载、解码和缩放。

  3. 处理好的图片按batch_size=32组成批次。

  4. embed()方法在满足GPU资源声明的执行实例上处理这些批次。

  5. 如果DAFT_GPU_PER_WORKER=1,类UDF会请求 GPU;如果设为0,同一段逻辑仍然可以在CPU上验证。

  6. show()输出最终的urlembedding结果。

对开发者来说,这个执行模型非常重要:先用DAFT_GPU_PER_WORKER=0验证Schema、空值处理和返回类型,再打开GPU资源请求检查异构调度是否符合预期。这种分阶段的方法可以显著降低调试成本,因为数据类型和逻辑错误在廉价的CPU资源上就能被发现,无需消耗GPU时间。

Spark的对比

以下对比只讨Daft on RaySpark在资源粒度和任务组织方式上的差异,不评价整体平台能力。

维度

Daft on Ray

Spark

资源声明粒度

算子级别。通过@daft.cls可以直接在单个算子上声明GPU需求。

作业级别或Stage级别。ResourceProfileStage生效,而非Stage内的单个算子。

GPU绑定机制

原生装饰器(@daft.cls(gpus=N)),无需外部插件。

通过ResourceProfile或第三方插件(如 RAPIDS Accelerator)进行外部配置。

流水线执行模型

流式morsel驱动执行。CPUGPU算子在同一条流水线中重叠运行,各阶段之间支持背压传播。

批处理Stage执行。Stage边界由Shuffle依赖决定,CPUGPU Stage通常按顺序运行。

代码组织方式

一份DataFrame代码中同时表达CPU预处理和GPU推理。

更常见的做法是将CPU ETL、特征处理和GPU推理拆分为不同Stage,甚至不同的作业。

这并不是说Spark不能使用GPU。Spark也支持通过ResourceProfileStage申请GPU资源。但两者的核心区别在于资源绑定点不同:

  • Daft中,GPU请求可以直接绑定到embedder.embed(...)这样的具体类UDF算子上。同一流水线中的其他算子继续在CPU Worker上运行,不会请求GPU资源。

  • Spark中,资源边界通常更靠近Stage级别,而非DataFrame表达式中的单个算子。GPU Stage中的所有任务都获得相同的资源分配,即使其中一些任务只执行CPU工作。

因此,当一条多模态流水线中只有少数算子需要GPU时,Daft更容易把CPU预处理和GPU推理写在同一份DataFrame代码中,并将调度粒度落到真正需要加速的那一段。

开发建议

  • 先在CPU上验证:先用DAFT_GPU_PER_WORKER=0运行样例,确认return_dtype、空值处理和输出Schema都正确。这样可以在不消耗GPU资源的情况下发现数据类型和逻辑错误。

  • 启用GPU并验证调度:然后将DAFT_GPU_PER_WORKER设为1,验证类UDF的资源请求与可用节点资源是否匹配。检查Ray Runner是否正确地将带GPU标注的算子路由到GPU节点。

  • 调优batch_sizemax_concurrency这两个参数直接影响GPU利用率和显存消耗。建议从batch_size=32max_concurrency=2开始,然后增大batch_size以提升吞吐,或减小max_concurrency以降低峰值GPU显存占用。最佳值取决于输入大小、模型大小和可用GPU显存。

  • 保持算子级别的资源声明:如果同一条流水线中还有其他PythonUDF,应继续在算子级别声明资源,而不是将整条流水线统一视为GPU作业。这样可以确保纯CPU算子不会被强制调度到GPU Worker 上。

  • 结合Ray Runner使用多节点GPU集群:在生产工作负载中,使用daft.set_runner_ray(address)连接到多节点Ray集群。Ray Runner会将带GPU标注的算子分发到所有可用的GPU节点上,而CPU算子则在任何有可用CPU资源的节点上运行。

  • 监控 GPU 利用率:使用Ray Dashboard监控各节点的GPU利用率。如果GPU利用率持续偏低,可以考虑增大batch_sizemax_concurrency。如果出现OOM错误,则应降低这些值,或减小gpus参数以使用分数GPU资源。