Daft允许您在单机上使用单机模式(Native Runner)开发和校验DataFrame逻辑,再切到Ray集群使用分布式模式(Ray Runner)进行分布式执行,整个过程不需要修改业务逻辑代码。
核心结论
Daft的一个关键工程优势,是同一份DataFrame业务逻辑可以先在单机模式(Native Runner)下开发和校验,再切到分布式模式(Ray Runner)放大到分布式集群执行。真正需要切换的,通常只有运行时配置,而不是DataFrame主体逻辑。
推荐的开发流程分为两步:
先在本机用小样本跑通表达式、列类型、过滤条件和聚合逻辑。
再把同一份代码接到Ray集群,让同样的业务流水线在更多CPU或GPU节点上运行。
Runner的选择应该在进程启动时完成。工程上不要假设一个已经初始化过Runner的Python进程还能在Native和Ray之间来回切换。
准备工作
建议使用Daft和Ray都支持的Python版本,例如Python3.10或3.11。下面的命令统一写成Python3.11,如果您的环境里Python已经指向兼容版本,可以自行替换。
最小依赖如下:
python3.11 -m pip install "daft[ray]"如果您已经有远端Ray集群,只需要拿到Ray Client地址并设置环境变量:
export DAFT_RAY_ADDRESS="ray://<head-host>:10001"如果您只是想先在当前机器验证分布式模式(Ray Runner),也可以不设置DAFT_RAY_ADDRESS,让Daft直接连接或启动本机Ray运行环境。
一份业务代码,两种运行方式
下面这段代码模拟了一个很常见的开发场景:从订单流水中筛出已支付订单,计算每个城市、每个渠道的成交额和订单数。它是完整可执行的代码,不依赖对象存储、数据库或外网下载,适合先在单机上验证,再切到分布式环境。
import os
import daft
from daft import col
def configure_runner():
runner = os.getenv("DAFT_RUNNER", "native").lower()
ray_address = os.getenv("DAFT_RAY_ADDRESS")
if runner == "ray":
daft.set_runner_ray(address=ray_address or None)
else:
daft.set_runner_native()
print(f"runner={runner}, ray_address={ray_address or 'local'}")
def build_source_df():
return daft.from_pydict(
{
"order_id": [1001, 1002, 1003, 1004, 1005, 1006],
"city": [
"Hangzhou",
"Hangzhou",
"Beijing",
"Shanghai",
"Beijing",
"Shanghai",
],
"channel": ["app", "web", "app", "app", "web", "web"],
"status": ["paid", "paid", "paid", "cancelled", "paid", "paid"],
"quantity": [2, 1, 4, 3, 2, 5],
"unit_price": [128.0, 256.0, 64.0, 88.0, 320.0, 40.0],
}
)
def build_pipeline(df):
return (
df.with_column("gmv", col("quantity") * col("unit_price"))
.where(col("status") == "paid")
.groupby("city", "channel")
.agg(
col("gmv").sum().alias("paid_gmv"),
col("order_id").count().alias("paid_orders"),
)
.sort(["paid_gmv", "city"], desc=[True, False])
)
def main():
configure_runner()
source_df = build_source_df()
result_df = build_pipeline(source_df)
result_df.show()
if __name__ == "__main__":
main()这段代码里每个部分的实际用途如下:
configure_runner():在进程启动时决定本次执行使用Native和Ray。这一步是双模运行的入口,也是单机和分布式共用同一份业务代码的前提。build_source_df():构造一个可重复、可预测的小样本输入。开发阶段先把输入固定下来,最容易定位表达式和Schema问题。build_pipeline(df):承载真正的业务逻辑。这里的DataFrame变换链本身与Runner解耦。main():把运行模式选择、输入构造和业务流水线串起来,并通过show()触发真正执行。
单机模式如何运行
开发阶段最稳妥的做法,是先在本机跑单机模式(Native Runner)。这样启动成本最低,定位问题也最快。
DAFT_RUNNER=native python3.11 your_script.py在单机上运行可以验证三件事:
输入样本是否符合您的预期。
列表达式和过滤条件是否正确。
聚合结果和排序结果是否与业务预期一致。
如果这里跑不通,不要急着切分布式。因为这类问题通常与集群规模无关,而是代码本身的逻辑问题。
分布式模式如何运行
当单机逻辑确认没有问题后,就可以切到分布式模式(Ray Runner)。只要Ray集群地址可达,DataFrame主体逻辑本身不需要改写。
DAFT_RUNNER=ray \
DAFT_RAY_ADDRESS="ray://<head-host>:10001" \
python3.11 your_script.py切到分布式模式主要验证两件事:
您的Daft业务逻辑在接入Ray后还能保持相同结果。
您的运行环境已经具备连到远端Ray集群并提交执行任务的条件。
在真实业务里,build_source_df()往往会替换成读取共享存储上的Parquet、Lance、Iceberg或其他数据源;但build_pipeline(df)这种业务变换链应该尽量保持不变。
使用建议
开发阶段默认先用单机模式(Native Runner)。先把输入样本、输出Schema、表达式逻辑和UDF行为固定下来,能显著降低排障成本。
只有当您确实需要更多CPU、GPU,或者需要把任务发到远端统一资源池时,再切分布式模式(Ray Runner)。
双模运行的关键不是“同一进程里切换Runner”,而是“同一份业务流水线在不同进程、不同运行环境里复用”。
如果您的数据源和结果输出位于共享存储中,先确认所有Ray工作节点都能访问同一份路径,否则单机能跑通,分布式不一定能真正落地。
Ray Client、Python minor版本和Daft版本最好保持一致,否则问题往往出在运行时兼容性,而不是DataFrame逻辑本身。