Interactive Ray development with DMS Notebook

更新时间:
复制 MD 格式

This topic describes how to use DMS Notebook to connect to a Ray cluster hosted by AnalyticDB for MySQL and perform interactive development using three programming paradigms: zero-code-intrusive parallelization, asynchronous task orchestration, and stateful service management.

Overview

As data volumes grow and machine learning applications become more widespread, data teams face the core challenge of efficiently training models, running distributed inference, and performing feature engineering on massive datasets. Traditional solutions often require users to move data between databases and compute clusters. This process not only introduces significant data transfer latency and storage costs but also requires teams to manage both database operations and distributed computing frameworks, making implementation more challenging.

To address this, AnalyticDB for MySQL integrates the Ray distributed computing framework with its existing Online Analytical Processing (OLAP) capabilities by introducing a built-in Ray resource group. You no longer need to set up and maintain a separate Ray cluster. Creating a Ray resource group in the AnalyticDB for MySQL console provides a distributed computing environment that is natively connected to your database storage and compute resources. This approach brings compute to the data, eliminating the bottleneck of cross-system data movement.

This topic provides a complete guide on how to connect to a remote Ray cluster hosted by AnalyticDB for MySQL from DMS Notebook. It demonstrates how to use Ray with three typical programming paradigms:

  • Zero-code-intrusive parallelization with ray.util.multiprocessing.

  • Asynchronous task orchestration with Remote Functions.

  • Stateful service management with Actors.

Whether you want to seamlessly migrate existing Python scripts to a distributed environment or build production-scale machine learning pipelines, this guide provides clear, reproducible instructions.

Key advantages

Interactive Ray programming in a notebook offers the following advantages:

  • Acts as a "remote control": Your Python script runs in the notebook and sends commands to the remote cluster over the network.

  • Ideal for interactive debugging, exploratory analysis, and small-scale task distribution.

  • Tasks submitted from your script run directly within the cluster, enabling efficient, low-latency communication between distributed workers.

Prerequisites

  • You have created a Ray service in your AnalyticDB for MySQL cluster.

  • You have created a notebook file in DMS Notebook. For instructions, see Steps 2 to 5 in Stream data by using PySpark.

    When you create a session, you must select a runtime image. The image determines the Python version of the notebook. Ensure that the Python version of the selected image matches that of the Ray cluster, which is 3.11.13.

  • You have added the CIDR block of the VPC where the notebook resides to the IP whitelist of your AnalyticDB for MySQL cluster.

Procedure

Step 1: Install dependencies

Important

Remote development with Ray requires that the Python and Ray versions on the client (the notebook environment) and the server (the Ray cluster) match. The Ray cluster uses Python 3.11.13 and Ray 2.49.2.

  1. Check the Python version of your notebook environment.

    !python --version
  2. Uninstall any existing versions of Ray.

    !pip uninstall -y ray
  3. Install the required libraries.

    !pip install "ray[client]==2.49.2" xgboost_ray pandas scikit-learn ipywidgets tqdm

Step 2: Get Ray connection information

  1. On the details page of the target AnalyticDB for MySQL cluster, in the navigation pane on the left, click Cluster Management > Resource Management.

  2. Click the Resource Groups tab. In the Actions column for the target resource group, click the image icon and select Details.

  3. Get the Ray Cluster Endpoint. The format is http://[RAY_INTERNAL_HOST]:8265.

Step 3: Connect and check versions

  1. Initialize the Ray connection.

    Modify the protocol and port of the Ray cluster connection address you obtained in Step 2 to ray://[RAY_INTERNAL_HOST]:10001. Replace the remote_url in the following code sample with this address.

    import ray
    import os
    
    # Define runtime environment dependencies
    runtime_env = { "pip": ["xgboost_ray", "pandas", "scikit-learn"] }
    
    remote_url = "ray://<ray_host>:10001"
    ray.init(address=remote_url, runtime_env=runtime_env)
    
    print(f"Successfully connected to the cluster! Ray cluster node count: {len(ray.nodes())}")
  2. Check version consistency between the client and server.

    import ray
    import sys
    
    def print_ray_versions():
        local_version = ray.__version__
        local_python = sys.version.split()[0]
        @ray.remote
        def get_server_info():
            import ray
            import sys
            return {
                "version": ray.__version__,
                "python": sys.version.split()[0]
            }
    
        try:
            server_info = ray.get(get_server_info.remote())
            
            print(f"{'Component':<10} | {'Ray Version':<15} | {'Python Version'}")
            print("-" * 45)
            print(f"{'Local(Client)':<10} | {local_version:<15} | {local_python}")
            print(f"{'Remote(Server)':<10} | {server_info['version']:<15} | {server_info['python']}")
            
            # Check for version compatibility
            if local_version != server_info['version']:
                print(f"\n Warning: Version mismatch! This may cause serialization errors or AttributeError.")
            else:
                print(f"\n Versions are fully matched.")
                
        except Exception as e:
            print(f" Failed to connect to cluster or execute task: {e}")
    
    # Run the version check
    print_ray_versions()

Step 4: Interactive development

Development method

Paradigm

Use cases

Zero-code-intrusive parallelization (ray.util.multiprocessing)

Zero-code-intrusive, synchronous, and procedural.

Migrating existing for loops or map tasks to the cluster with minimal code changes.

Asynchronous task orchestration (Remote Functions)

Explicit, asynchronous, non-blocking primitive.

For fine-grained control over task dependencies and execution flow.

Stateful service management (Actors)

Object-oriented paradigm that maintains state across method calls.

For managing mutable state across multiple operations.

Method 1: Zero-code-intrusive parallelization

from ray.util.multiprocessing import Pool

def square(x):
    return x * x

# The usage is identical to Python's native multiprocessing. The @ray.remote decorator is not needed.
with Pool() as pool:
    # This line automatically distributes the execution to the remote cluster.
    results = pool.map(square, range(10))
    print(f"Pool results: {results}")

Method 2: Asynchronous task orchestration

@ray.remote
def add(x, y):
    return x + y

# asynchronous call - immediately returns an ObjectRef, which is similar to a Promise or Future.
ref1 = add.remote(1, 2)
ref2 = add.remote(3, 4)

# Block only when the actual result is needed.
final_result = ray.get(add.remote(ref1, ref2)) 
print(f"Task result: {final_result}")

Method 3: Stateful service management

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0
    def increment(self):
        self.value += 1
        return self.value

# Create a persistent "counter" object on the cluster.
counter_actor = Counter.remote()

# Call the same Actor instance multiple times. The state is preserved.
results = [counter_actor.increment.remote() for _ in range(3)]
print(f"Actor counter results: {ray.get(results)}") # Output: [1, 2, 3]