提交作业到Ray应用

更新时间:
复制 MD 格式

当您需要在分布式环境中运行数据处理或机器学习任务时,Ray应用提供了一种高效的方式来提交和管理您的Python作业。本文档将指导您如何通过多种方式(包括JupyterLab、Python SDK、CLI等)将一个Ray作业提交到您的Ray应用并执行,同时介绍如何监控作业状态和查看结果,帮助您简化开发与部署流程。

方式对比与选择

Ray应用提供多种作业提交方式,请根据您的使用场景和偏好选择最合适的一种。

方式

适用场景

优点

缺点/注意事项

JupyterLab Terminal(推荐)

在控制台环境中快速提交和测试脚本。

无需本地配置,环境预装,开箱即用。

不适合自动化、大规模提交。

Jupyter Notebook(推荐)

交互式数据探索和算法调试。

交互性强,可逐行执行,方便调试。

资源占用与会话绑定,不适合长时间运行的生产任务。

Ray Python SDK(推荐)

本地开发、与现有Python应用集成、CI/CD自动化。

灵活,功能强大,易于集成。

需要在本地安装ray并进行网络和认证配置。

Ray CLI (本地/ECS)

习惯使用原生Ray CLI的开发者,进行脚本化提交。

贴近原生Ray体验。

配置与Python SDK类似,但灵活性稍差。

REST API(不推荐)

特定的自动化场景,如非Python环境调用。

语言无关,依赖最少。

功能受限,无法自动上传代码包,调用繁琐。

准备信息

在提交作业前,您需要从Ray应用中获取应用的连接地址配置信息,并设置您开发环境的私网/公网IP地址到应用白名单中。

使用JupyterLab Terminal提交作业

此方式直接在Ray应用提供的预置环境中使用ray job submit命令,无需在本地安装任何依赖,是快速测试和运行脚本的首选。

说明

使用JupyterLab Terminal提交作业时,还需将Ray应用所属集群的VPCIPv4网段添加至应用白名单中。

  1. 登录JupyterLab:

    1. 在浏览器中打开已获取的Jupyter公网地址。

    2. 在密码框(Password or token)中输入secret.jupyterlab.password对应的密钥,进入JupyterLab界面。

  2. 准备Python脚本:

    1. JupyterLab的文件浏览器中,创建一个工作目录(例如src)并双击进入工作目录,创建一个 Python脚本文件(例如script.py)。在 JupyterLab 左侧文件浏览器顶部工具栏中,单击 + 按钮右侧的上传按钮,将本地项目文件(如 src 目录及 requirements.txt)上传至工作目录。在 JupyterLab 的 Launcher 页面中,单击 Other 区域的 Python File 选项,创建一个新的 Python 文件。

    2. 示例src/script.py内容:

      import ray
      import time
      @ray.remote
      def retrieve_task(item, db):
          time.sleep(item / 10.)
          return item, db[item]
      if __name__ == "__main__":
          database = [
              "Learning", "Ray", "Flexible", "Distributed", "Python", "for",
              "Machine", "Learning"
          ]
          ray.init()
          db_object_ref = ray.put(database)
          retrieve_refs = [
              retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]
          ]
          result = [print(data) for data in ray.get(retrieve_refs)]
  3. 提交作业并查看结果:

    1. JupyterLab中,通过File > New Launcher打开一个新的Terminal。

    2. Terminal中执行以下命令,提交作业并查看其日志。

      # 1.返回上层目录(根目录),为演示提交作业中--working-dir参数。
      cd ..
      # 2. 设置环境变量(请替换为您的真实信息)
      # 作业提交匿名密钥:secret.jwt.anonKey
      ANON_KEY="<YOUR_ANON_KEY>"
      # 3. 提交作业
      # 提交任务,并等待执行结果
      ray job submit --headers "{\"Authorization\": \"Bearer $ANON_KEY\"}" --working-dir ./src -- python script.py
      # 提交任务,不等待执行结果
      ray job submit --headers "{\"Authorization\": \"Bearer $ANON_KEY\"}" --no-wait --working-dir ./src -- python script.py

      示例结果如下:

      requests.exceptions.HTTPError: 401 Client Error: Unauthorized for url: http://xxx/api/version
      (base) jovyan@pa-xxx:~/work$ ANON_KEY="exxx"
      Yxxx
      (base) jovyan@pa-2xxx:~/work$ ray job submit --headers "{\"Authorization\": \"Bearer $ANON_KEY\"}" --no-wait --working-dir ./src -- python script.py
      Job submission server address: http://1xxx:8265
      2025-12-18 03:47:06,460 INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_xxx.zip.
      2025-12-18 03:47:06,461 INFO packaging.py:576 -- Creating a file package for local module './src'.
      -------------------------------------------------------
      Job 'raysubmit_Jxxx' submitted successfully
      -------------------------------------------------------
      Next steps
        Query the logs of the job:
          ray job logs raysubmit_Jxxx
        Query the status of the job:
          ray job status raysubmit_xxx
        Request the job to be stopped:
          ray job stop raysubmit_Jxxx
      (base) jovyan@pa-xxx:~/work$

Jupyter Notebook中交互式执行作业

对于需要逐行执行代码、进行探索性分析和调试的场景,Jupyter Notebook是理想的选择。

  1. 登录并新建Notebook:

    1. 在浏览器中打开已获取的Jupyter公网地址。

    2. 在密码框(Password or token)中输入secret.jupyterlab.password对应的密钥,进入JupyterLab界面。

    3. 通过File > New Launcher打开一个新建Notebook、ConsolePython File。Launcher 页面提供三种 Python 开发入口:Notebook 区块的 Python 3 (ipykernel) 用于创建交互式笔记本,Console 区块的 Python 3 (ipykernel) 用于打开交互式控制台,Other 区块的 Python File 用于创建 Python 脚本文件。

  2. 运行代码:以Notebook为例,在代码单元格(Cell)中输入并执行以下代码。

    import ray
    import time
    @ray.remote
    def retrieve_task(item, db):
        time.sleep(item / 10.)
        return item, db[item]
    if __name__ == "__main__":
        database = [
            "Learning", "Ray", "Flexible", "Distributed", "Python", "for",
            "Machine", "Learning"
        ]
        ray.init()
        db_object_ref = ray.put(database)
        retrieve_refs = [
            retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]
        ]
        result = [print(data) for data in ray.get(retrieve_refs)]

    运行结果如下:

    (0, 'Learning')
    (2, 'Flexible')
    (4, 'Python')
    (6, 'Machine')

使用Ray Python SDK提交作业

当您需要在本地开发环境、现有Python应用或CI/CD流水线中以编程方式提交作业时,推荐使用Ray Python SDK。

  1. 准备环境

    说明

    请先在您的本地环境中安装Python 3环境。

    安装Ray SDK:

    pip install ray    
  2. 准备代码文件
    您需要在您的工作目录下,新建两个Ray应用脚本。一个用于业务处理,一个用于提交Job。工作目录结构示例如下:

    - working-dir/
     - script.py # 您的Ray业务逻辑脚本
     - ray_job_submit.py # 用于提交作业的脚本

    script.py

    import ray
    import time
    @ray.remote
    def retrieve_task(item, db):
        time.sleep(item / 10.)
        return item, db[item]
    if __name__ == "__main__":
        database = [
            "Learning", "Ray", "Flexible", "Distributed", "Python", "for",
            "Machine", "Learning"
        ]
        ray.init()
        db_object_ref = ray.put(database)
        retrieve_refs = [
            retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6]
        ]
        result = [print(data) for data in ray.get(retrieve_refs)]
                

    ray_job_submit.py

    import os
    import json
    import base64
    import hmac
    import hashlib
    import requests
    from datetime import timezone, timedelta, datetime
    from requests.auth import HTTPBasicAuth
    from ray.job_submission import JobSubmissionClient
    # 您的 Ray Dashboard 地址
    address = os.getenv("RAY_DASHBOARD_ADDRESS")
    # 您可以直接使用 `secret.jwt.anonKey` 进行认证
    token = os.getenv("RAY_ANON_TOKEN")
    # 您也可以使用 `secret.jwt.secret` 生成 JWT
    secret = os.getenv("RAY_JWT_SECRET")
    header = {"alg": "HS256", "typ": "JWT"}
    now = datetime.now(tz=timezone.utc)
    payload = {
      "sub": "anon",
      "iat": int(now.timestamp()),
      "exp": int((now + timedelta(hours=12)).timestamp()),
      "iss": "ray-dashboard",
      "aud": "ray-dashboard-client"
    }
    header_encoded = base64.urlsafe_b64encode(
      json.dumps(header,
                 separators=(',', ':')).encode()).decode('utf-8').rstrip('=')
    payload_encoded = base64.urlsafe_b64encode(
      json.dumps(payload,
                 separators=(',', ':')).encode()).decode('utf-8').rstrip('=')
    message = f"{header_encoded}.{payload_encoded}"
    signature = hmac.new(secret.encode(), message.encode(),
                         hashlib.sha256).digest()
    signature_encoded = base64.urlsafe_b64encode(signature).decode('utf-8').rstrip(
      '=')
    token = f"{message}.{signature_encoded}"
    # 您也可以通过POST /api/auth/token 获取 JWT 进行认证,您需要提供用户名和密码
    username = os.getenv('RAY_USERNAME')
    password = os.getenv('RAY_PASSWORD')
    response = requests.post(f'{address}/api/auth/token',
                             auth=HTTPBasicAuth(username, password),
                             timeout=5)
    if response.status_code == 200:
      token_data = response.json()
      token = token_data['access_token']
    else:
      print(f"Failed to get JWT: {response.status_code}")
    client = JobSubmissionClient(address,
                                 headers={"Authorization": f"Bearer {token}"})
    job_id = client.submit_job(
      # Entrypoint shell command to execute
      entrypoint="python script.py",
      # Path to the local directory that contains the script.py file
      runtime_env={"working_dir": "./"})
    print(job_id)
  3. 配置并运行

    1. Ray应用配置信息页面查看以下信息,并将其设置为环境变量或在submit_job.py脚本中进行替换

      变量

      对应配置项

      示例值

      RAY_DASHBOARD_ADDRESS

      Ray应用连接地址,根据您的实际环境选择公网或私网。

      http://123.xxx.xxx.xxx:8265

      RAY_ANON_TOKEN

      secret.jwt.anonKey

      eyJhbGciOi...

      RAY_JWT_SECRET

      secret.jwt.secret

      tNhVxysSRD...

      RAY_USERNAME

      secret.dashboard.username

      admin

      RAY_PASSWORD

      secret.dashboard.password

      UTLof$rMVM...

    2. 在您的工作目录下,执行提交脚本:

      python ray_job_submit.py
    3. 执行成功后,您可以登录Ray Dashboard,在Jobs页面查看已提交作业的运行状态。任务记录显示 Entrypointpython script.pyStatusSUCCEEDEDStatus messageJob finished successfully,表明任务已成功完成。

使用Ray CLI从本地或ECS实例提交作业

此方法与在JupyterLab Terminal中操作类似,但需要在您自己的环境中执行,适用于习惯使用命令行的开发者。

  1. 确保满足前提条件

    • 网络:确认您的开发环境IP地址已在应用的白名单或安全组中,且能访问Ray Dashboard地址。

    • 安装Ray:执行pip install ray

  2. 提交作业:在您的本地终端或ECS Terminal中,使用ray job submit命令提交作业。所有命令都需通过--address参数明确指定Ray Dashboard地址。

    # 设置环境变量(请替换为您的真实信息)
    # Ray应用的Dashboard地址,需添加http://前缀
    RAY_ADDRESS="<YOUR_DASHBOARD_URL>"
    # 作业提交匿名密钥:secret.jwt.anonKey
    ANON_KEY="<YOUR_ANON_KEY>"
    # 提交作业
    ray job submit --address "$RAY_ADDRESS" --headers "{\"Authorization\": \"Bearer $ANON_KEY\"}"  --working-dir <工作目录> -- python script.py

使用REST API提交作业

此方法仅适用于执行单行简单命令,无法自动上传本地Python脚本或代码目录。对于绝大多数场景,推荐使用功能更强大、更便捷的Ray Python SDKRay CLI。

  1. 准备请求信息:获取Ray Dashboard和匿名密钥secret.jwt.anonKey

  2. 发送POST请求:使用curl或任何编程语言的HTTP客户端发送请求。此处以Python为例:

    import requests
    import json
    # 您的 Ray Dashboard 地址,需添加http://前缀
    RAY_ADDRESS = "<YOUR_DASHBOARD_URL>"
    # 您的匿名密钥
    RAY_ANON_KEY = "<YOUR_ANON_KEY>"
    headers = {
        'Content-Type': 'application/json',
        "Authorization": f"Bearer {RAY_ANON_KEY}"
    }
    # entrypoint 只能是简单的单行命令
    payload = {
        "entrypoint": "python -c \"import ray; ray.init(); print(ray.nodes())\"",
    }
    # API 端点末尾必须有斜杠 "/"
    response = requests.post(
        f"{RAY_ADDRESS}/api/jobs/",
        json=payload,
        headers=headers
    )
    response.raise_for_status()
    job_info = response.json()
    print(f"作业提交成功: {job_info}")

    运行结果如下:

    作业提交成功: {'job_id': 'raysubmit_xxx', 'submission_id': 'raysubmit_xxx'}