Interactive Ray development with DMS Notebook
This topic describes how to use DMS Notebook to connect to a Ray cluster hosted by AnalyticDB for MySQL and perform interactive development using three programming paradigms: zero-code-intrusive parallelization, asynchronous task orchestration, and stateful service management.
Overview
As data volumes grow and machine learning applications become more widespread, data teams face the core challenge of efficiently training models, running distributed inference, and performing feature engineering on massive datasets. Traditional solutions often require users to move data between databases and compute clusters. This process not only introduces significant data transfer latency and storage costs but also requires teams to manage both database operations and distributed computing frameworks, making implementation more challenging.
To address this, AnalyticDB for MySQL integrates the Ray distributed computing framework with its existing Online Analytical Processing (OLAP) capabilities by introducing a built-in Ray resource group. You no longer need to set up and maintain a separate Ray cluster. Creating a Ray resource group in the AnalyticDB for MySQL console provides a distributed computing environment that is natively connected to your database storage and compute resources. This approach brings compute to the data, eliminating the bottleneck of cross-system data movement.
This topic provides a complete guide on how to connect to a remote Ray cluster hosted by AnalyticDB for MySQL from DMS Notebook. It demonstrates how to use Ray with three typical programming paradigms:
Zero-code-intrusive parallelization with
ray.util.multiprocessing.Asynchronous task orchestration with Remote Functions.
Stateful service management with Actors.
Whether you want to seamlessly migrate existing Python scripts to a distributed environment or build production-scale machine learning pipelines, this guide provides clear, reproducible instructions.
Key advantages
Interactive Ray programming in a notebook offers the following advantages:
Acts as a "remote control": Your Python script runs in the notebook and sends commands to the remote cluster over the network.
Ideal for interactive debugging, exploratory analysis, and small-scale task distribution.
Tasks submitted from your script run directly within the cluster, enabling efficient, low-latency communication between distributed workers.
Prerequisites
You have created a Ray service in your AnalyticDB for MySQL cluster.
You have created a notebook file in DMS Notebook. For instructions, see Steps 2 to 5 in Stream data by using PySpark.
When you create a session, you must select a runtime image. The image determines the Python version of the notebook. Ensure that the Python version of the selected image matches that of the Ray cluster, which is 3.11.13.
You have added the CIDR block of the VPC where the notebook resides to the IP whitelist of your AnalyticDB for MySQL cluster.
Procedure
Step 1: Install dependencies
Remote development with Ray requires that the Python and Ray versions on the client (the notebook environment) and the server (the Ray cluster) match. The Ray cluster uses Python 3.11.13 and Ray 2.49.2.
Check the Python version of your notebook environment.
!python --versionUninstall any existing versions of Ray.
!pip uninstall -y rayInstall the required libraries.
!pip install "ray[client]==2.49.2" xgboost_ray pandas scikit-learn ipywidgets tqdm
Step 2: Get Ray connection information
On the details page of the target AnalyticDB for MySQL cluster, in the navigation pane on the left, click .
Click the Resource Groups tab. In the Actions column for the target resource group, click the
icon and select Details.Get the Ray Cluster Endpoint. The format is
http://[RAY_INTERNAL_HOST]:8265.
Step 3: Connect and check versions
Initialize the Ray connection.
Modify the protocol and port of the Ray cluster connection address you obtained in Step 2 to
ray://[RAY_INTERNAL_HOST]:10001. Replace theremote_urlin the following code sample with this address.import ray import os # Define runtime environment dependencies runtime_env = { "pip": ["xgboost_ray", "pandas", "scikit-learn"] } remote_url = "ray://<ray_host>:10001" ray.init(address=remote_url, runtime_env=runtime_env) print(f"Successfully connected to the cluster! Ray cluster node count: {len(ray.nodes())}")Check version consistency between the client and server.
import ray import sys def print_ray_versions(): local_version = ray.__version__ local_python = sys.version.split()[0] @ray.remote def get_server_info(): import ray import sys return { "version": ray.__version__, "python": sys.version.split()[0] } try: server_info = ray.get(get_server_info.remote()) print(f"{'Component':<10} | {'Ray Version':<15} | {'Python Version'}") print("-" * 45) print(f"{'Local(Client)':<10} | {local_version:<15} | {local_python}") print(f"{'Remote(Server)':<10} | {server_info['version']:<15} | {server_info['python']}") # Check for version compatibility if local_version != server_info['version']: print(f"\n Warning: Version mismatch! This may cause serialization errors or AttributeError.") else: print(f"\n Versions are fully matched.") except Exception as e: print(f" Failed to connect to cluster or execute task: {e}") # Run the version check print_ray_versions()
Step 4: Interactive development
Development method | Paradigm | Use cases |
Zero-code-intrusive parallelization (ray.util.multiprocessing) | Zero-code-intrusive, synchronous, and procedural. | Migrating existing for loops or map tasks to the cluster with minimal code changes. |
Asynchronous task orchestration (Remote Functions) | Explicit, asynchronous, non-blocking primitive. | For fine-grained control over task dependencies and execution flow. |
Stateful service management (Actors) | Object-oriented paradigm that maintains state across method calls. | For managing mutable state across multiple operations. |
Method 1: Zero-code-intrusive parallelization
from ray.util.multiprocessing import Pool
def square(x):
return x * x
# The usage is identical to Python's native multiprocessing. The @ray.remote decorator is not needed.
with Pool() as pool:
# This line automatically distributes the execution to the remote cluster.
results = pool.map(square, range(10))
print(f"Pool results: {results}")Method 2: Asynchronous task orchestration
@ray.remote
def add(x, y):
return x + y
# asynchronous call - immediately returns an ObjectRef, which is similar to a Promise or Future.
ref1 = add.remote(1, 2)
ref2 = add.remote(3, 4)
# Block only when the actual result is needed.
final_result = ray.get(add.remote(ref1, ref2))
print(f"Task result: {final_result}")Method 3: Stateful service management
@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
# Create a persistent "counter" object on the cluster.
counter_actor = Counter.remote()
# Call the same Actor instance multiple times. The state is preserved.
results = [counter_actor.increment.remote() for _ in range(3)]
print(f"Actor counter results: {ray.get(results)}") # Output: [1, 2, 3]