单机与分布式双模切换

更新时间:
复制为 MD 格式

Daft允许您在单机上使用单机模式(Native Runner)开发和校验DataFrame逻辑,再切到Ray集群使用分布式模式(Ray Runner)进行分布式执行,整个过程不需要修改业务逻辑代码。

核心结论

Daft的一个关键工程优势,是同一份DataFrame业务逻辑可以先在单机模式(Native Runner)下开发和校验,再切到分布式模式(Ray Runner)放大到分布式集群执行。真正需要切换的,通常只有运行时配置,而不是DataFrame主体逻辑。

推荐的开发流程分为两步:

  1. 先在本机用小样本跑通表达式、列类型、过滤条件和聚合逻辑。

  2. 再把同一份代码接到Ray集群,让同样的业务流水线在更多CPUGPU节点上运行。

说明

Runner的选择应该在进程启动时完成。工程上不要假设一个已经初始化过RunnerPython进程还能在NativeRay之间来回切换。

准备工作

建议使用DaftRay都支持的Python版本,例如Python3.103.11。下面的命令统一写成Python3.11,如果您的环境里Python已经指向兼容版本,可以自行替换。

最小依赖如下:

python3.11 -m pip install "daft[ray]"

如果您已经有远端Ray集群,只需要拿到Ray Client地址并设置环境变量:

export DAFT_RAY_ADDRESS="ray://<head-host>:10001"

如果您只是想先在当前机器验证分布式模式(Ray Runner),也可以不设置DAFT_RAY_ADDRESS,让Daft直接连接或启动本机Ray运行环境。

一份业务代码,两种运行方式

下面这段代码模拟了一个很常见的开发场景:从订单流水中筛出已支付订单,计算每个城市、每个渠道的成交额和订单数。它是完整可执行的代码,不依赖对象存储、数据库或外网下载,适合先在单机上验证,再切到分布式环境。

import os

import daft
from daft import col


def configure_runner():
    runner = os.getenv("DAFT_RUNNER", "native").lower()
    ray_address = os.getenv("DAFT_RAY_ADDRESS")

    if runner == "ray":
        daft.set_runner_ray(address=ray_address or None)
    else:
        daft.set_runner_native()

    print(f"runner={runner}, ray_address={ray_address or 'local'}")


def build_source_df():
    return daft.from_pydict(
        {
            "order_id": [1001, 1002, 1003, 1004, 1005, 1006],
            "city": [
                "Hangzhou",
                "Hangzhou",
                "Beijing",
                "Shanghai",
                "Beijing",
                "Shanghai",
            ],
            "channel": ["app", "web", "app", "app", "web", "web"],
            "status": ["paid", "paid", "paid", "cancelled", "paid", "paid"],
            "quantity": [2, 1, 4, 3, 2, 5],
            "unit_price": [128.0, 256.0, 64.0, 88.0, 320.0, 40.0],
        }
    )


def build_pipeline(df):
    return (
        df.with_column("gmv", col("quantity") * col("unit_price"))
        .where(col("status") == "paid")
        .groupby("city", "channel")
        .agg(
            col("gmv").sum().alias("paid_gmv"),
            col("order_id").count().alias("paid_orders"),
        )
        .sort(["paid_gmv", "city"], desc=[True, False])
    )


def main():
    configure_runner()
    source_df = build_source_df()
    result_df = build_pipeline(source_df)
    result_df.show()


if __name__ == "__main__":
    main()

这段代码里每个部分的实际用途如下:

  • configure_runner():在进程启动时决定本次执行使用NativeRay。这一步是双模运行的入口,也是单机和分布式共用同一份业务代码的前提。

  • build_source_df():构造一个可重复、可预测的小样本输入。开发阶段先把输入固定下来,最容易定位表达式和Schema问题。

  • build_pipeline(df):承载真正的业务逻辑。这里的DataFrame变换链本身与Runner解耦。

  • main():把运行模式选择、输入构造和业务流水线串起来,并通过show()触发真正执行。

单机模式如何运行

开发阶段最稳妥的做法,是先在本机跑单机模式(Native Runner)。这样启动成本最低,定位问题也最快。

DAFT_RUNNER=native python3.11 your_script.py

在单机上运行可以验证三件事:

  • 输入样本是否符合您的预期。

  • 列表达式和过滤条件是否正确。

  • 聚合结果和排序结果是否与业务预期一致。

如果这里跑不通,不要急着切分布式。因为这类问题通常与集群规模无关,而是代码本身的逻辑问题。

分布式模式如何运行

当单机逻辑确认没有问题后,就可以切到分布式模式(Ray Runner)。只要Ray集群地址可达,DataFrame主体逻辑本身不需要改写。

DAFT_RUNNER=ray \
DAFT_RAY_ADDRESS="ray://<head-host>:10001" \
python3.11 your_script.py

切到分布式模式主要验证两件事:

  • 您的Daft业务逻辑在接入Ray后还能保持相同结果。

  • 您的运行环境已经具备连到远端Ray集群并提交执行任务的条件。

在真实业务里,build_source_df()往往会替换成读取共享存储上的Parquet、Lance、Iceberg或其他数据源;但build_pipeline(df)这种业务变换链应该尽量保持不变。

使用建议

  • 开发阶段默认先用单机模式(Native Runner)。先把输入样本、输出Schema、表达式逻辑和UDF行为固定下来,能显著降低排障成本。

  • 只有当您确实需要更多CPU、GPU,或者需要把任务发到远端统一资源池时,再切分布式模式(Ray Runner)。

  • 双模运行的关键不是“同一进程里切换Runner”,而是“同一份业务流水线在不同进程、不同运行环境里复用”。

  • 如果您的数据源和结果输出位于共享存储中,先确认所有Ray工作节点都能访问同一份路径,否则单机能跑通,分布式不一定能真正落地。

  • Ray Client、Python minor版本和Daft版本最好保持一致,否则问题往往出在运行时兼容性,而不是DataFrame逻辑本身。