基于DMS Notebook进行Ray交互式开发
本文介绍如何通过 DMS Notebook 连接AnalyticDB for MySQL托管的 Ray 集群,使用零代码侵入并行化、异步任务编排和有状态服务管理三种范式进行交互式开发。
功能概述
随着企业数据规模的持续膨胀与机器学习应用的加速普及,如何在海量数据上高效完成模型训练、分布式推理与特征工程,成为数据团队面临的核心挑战。传统方案往往需要用户在数据库与计算集群之间反复搬运数据,不仅带来显著的数据传输延迟与存储成本,还要求团队同时具备数据库运维与分布式计算框架管理的双重能力,极大提高了工程落地门槛。
为此,AnalyticDB for MySQL在原有 OLAP 分析能力的基础上,深度集成了 Ray 分布式计算框架,推出了内置 Ray 资源组的计算能力。用户无需单独搭建和维护独立的 Ray 集群,只需在AnalyticDB for MySQL控制台创建 Ray 资源组,即可获得一个与数据库存算资源天然打通的分布式计算环境——数据就在哪里,计算就在哪里,彻底消除跨系统数据搬运的瓶颈。
本文将通过一个完整的端到端操作流程,演示如何从 DMS Notebook 环境连接至AnalyticDB for MySQL托管的远端 Ray 集群,并依次展示 Ray 在三种典型编程范式下的使用方式:
基于
ray.util.multiprocessing的零改造并行化。基于 Remote Function 的异步任务编排。
基于 Actor 的有状态服务管理。
无论您是希望将现有 Python 脚本无缝迁移至分布式环境,还是构建面向生产级别的大规模 ML 训练流水线,本文都将为您提供清晰可复现的实践参考。
核心优势
基于 Notebook 进行交互式风格的 Ray 编程具有以下优势:
充当“遥控器”的角色:您的 Python 脚本在Notebook环境中运行,并通过网络向远程集群发送指令。
最适合用于交互式调试、探索性分析以及小规模任务分发。
您的脚本直接在集群的头节点(Head node)上运行,从而消除了网络延迟,是进行大规模数据处理(Ray Data)及运行生产级工作负载的理想选择。
前提条件
AnalyticDB for MySQL集群已创建Ray服务。
进入DMS Notebook并创建Notebook文件。创建操作,请参见通过PySpark流式导入数据步骤2~5。
创建会话时需要选择运行镜像,镜像决定了 Notebook 的 Python 版本,请确保所选镜像的 Python 版本与 Ray集群一致(当前为 3.11.13)。
将 Notebook 所在VPC的CIDR网段添加到AnalyticDB for MySQL的 IP 白名单中。
操作步骤
步骤一:在Notebook中安装依赖库
Ray 的远程开发要求客户端(Notebook环境)与服务端(Ray集群)的Python版本和Ray版本严格一致。当前Ray集群的默认Python版本为3.11.13, Ray内核版本是2.49.2。
检查 Notebook 环境的Python 版本。
!python --version卸载已安装的 Ray 版本。
!pip uninstall -y ray安装 Ray 及相关依赖库。
!pip install "ray[client]==2.49.2" xgboost_ray pandas scikit-learn ipywidgets tqdm
步骤二:获取 Ray 连接信息
在AnalyticDB for MySQL目标集群详情页,左侧导航栏,单击。
单击资源组管理页签,在目标资源组所在行操作列,单击
按钮下的详情。获取Ray集群连接地址,格式为
http://[RAY_INTERNAL_HOST]:8265。
步骤三:建立连接并检查版本
初始化 Ray 连接。
将步骤二获取到的Ray集群地址的协议和端口修改为
ray://[RAY_INTERNAL_HOST]:10001,替换以下示例代码中的remote_url。import ray import os # 定义运行时环境依赖 runtime_env = { "pip": ["xgboost_ray", "pandas", "scikit-learn"] } remote_url = "ray://<ray_host>:10001" ray.init(address=remote_url, runtime_env=runtime_env) print(f"Successfully connected to the cluster! Ray cluster node count: {len(ray.nodes())}")检查客户端与服务器版本一致性。
import ray import sys def print_ray_versions(): local_version = ray.__version__ local_python = sys.version.split()[0] @ray.remote def get_server_info(): import ray import sys return { "version": ray.__version__, "python": sys.version.split()[0] } try: server_info = ray.get(get_server_info.remote()) print(f"{'Component':<10} | {'Ray Version':<15} | {'Python Version'}") print("-" * 45) print(f"{'Local(Client)':<10} | {local_version:<15} | {local_python}") print(f"{'Remote(Server)':<10} | {server_info['version']:<15} | {server_info['python']}") # 检查版本兼容性 if local_version != server_info['version']: print(f"\n Warning: Version mismatch! This may cause serialization errors or AttributeError.") else: print(f"\n Versions are fully matched.") except Exception as e: print(f" Failed to connect to cluster or execute task: {e}") # 执行版本检查 print_ray_versions()
步骤四:使用 Ray 进行交互式开发
开发方式 | 范式 | 适用场景 |
零代码侵入并行化(ray.util.multiprocessing) | 零代码侵入、同步阻塞、过程式风格。 | 您已有现成的 for 循环或 Map 任务,并希望在不修改代码结构的前提下,将其运行在集群上。 |
异步任务编排(Remote Functions) | 显式定义、异步非阻塞、底层原语。 | 需要对任务依赖关系和执行流程进行细粒度控制的场景。 |
有状态服务管理(Actors) | 面向对象、跨方法调用保持持久状态。 | 需要管理跨多次操作的可变状态。 |
方式一:零代码侵入并行化(ray.util.multiprocessing)
from ray.util.multiprocessing import Pool
def square(x):
return x * x
# 使用方式与原生 Python multiprocessing 完全一致,无需 @ray.remote 装饰器
with Pool() as pool:
# 此行自动将执行分发到远程集群
results = pool.map(square, range(10))
print(f"Pool results: {results}")方式二:异步任务编排(Remote Functions)
@ray.remote
def add(x, y):
return x + y
# 异步调用 - 立即返回 ObjectRef(类似于 Promise/Future)
ref1 = add.remote(1, 2)
ref2 = add.remote(3, 4)
# 仅在需要实际结果时阻塞
final_result = ray.get(add.remote(ref1, ref2))
print(f"Task result: {final_result}")
tmp = local_py_function(final_result)方式三:有状态服务管理(Actors)
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
# 在集群中创建一个持久的“计数器”对象
counter_actor = Counter.remote()
# 对同一个 Actor 实例进行多次调用 - 状态将被保留
results = [counter_actor.increment.remote() for _ in range(3)]
print(f"Actor counter results: {ray.get(results)}") # 输出: [1, 2, 3]