当您需要在分布式环境中运行数据处理或机器学习任务时,Ray应用提供了一种高效的方式来提交和管理您的Python作业。本文档将指导您如何通过多种方式(包括JupyterLab、Python SDK、CLI等)将一个Ray作业提交到您的Ray应用并执行,同时介绍如何监控作业状态和查看结果,帮助您简化开发与部署流程。
方式对比与选择
Ray应用提供多种作业提交方式,请根据您的使用场景和偏好选择最合适的一种。
|
方式 |
适用场景 |
优点 |
缺点/注意事项 |
|
JupyterLab Terminal(推荐) |
在控制台环境中快速提交和测试脚本。 |
无需本地配置,环境预装,开箱即用。 |
不适合自动化、大规模提交。 |
|
Jupyter Notebook(推荐) |
交互式数据探索和算法调试。 |
交互性强,可逐行执行,方便调试。 |
资源占用与会话绑定,不适合长时间运行的生产任务。 |
|
Ray Python SDK(推荐) |
本地开发、与现有Python应用集成、CI/CD自动化。 |
灵活,功能强大,易于集成。 |
需要在本地安装 |
|
Ray CLI (本地/ECS) |
习惯使用原生Ray CLI的开发者,进行脚本化提交。 |
贴近原生Ray体验。 |
配置与Python SDK类似,但灵活性稍差。 |
|
REST API(不推荐) |
特定的自动化场景,如非Python环境调用。 |
语言无关,依赖最少。 |
功能受限,无法自动上传代码包,调用繁琐。 |
准备信息
使用JupyterLab Terminal提交作业
此方式直接在Ray应用提供的预置环境中使用ray job submit命令,无需在本地安装任何依赖,是快速测试和运行脚本的首选。
使用JupyterLab Terminal提交作业时,还需将Ray应用所属集群的VPC主IPv4网段添加至应用白名单中。
-
登录JupyterLab:
-
在浏览器中打开已获取的Jupyter公网地址。
-
在密码框(Password or token)中输入
secret.jupyterlab.password对应的密钥,进入JupyterLab界面。
-
-
准备Python脚本:
-
在JupyterLab的文件浏览器中,创建一个工作目录(例如
src)并双击进入工作目录,创建一个 Python脚本文件(例如script.py)。在 JupyterLab 左侧文件浏览器顶部工具栏中,单击 + 按钮右侧的上传按钮,将本地项目文件(如src目录及requirements.txt)上传至工作目录。在 JupyterLab 的 Launcher 页面中,单击 Other 区域的 Python File 选项,创建一个新的 Python 文件。 -
示例
src/script.py内容:import ray import time @ray.remote def retrieve_task(item, db): time.sleep(item / 10.) return item, db[item] if __name__ == "__main__": database = [ "Learning", "Ray", "Flexible", "Distributed", "Python", "for", "Machine", "Learning" ] ray.init() db_object_ref = ray.put(database) retrieve_refs = [ retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6] ] result = [print(data) for data in ray.get(retrieve_refs)]
-
-
提交作业并查看结果:
-
在JupyterLab中,通过打开一个新的Terminal。
-
在Terminal中执行以下命令,提交作业并查看其日志。
# 1.返回上层目录(根目录),为演示提交作业中--working-dir参数。 cd .. # 2. 设置环境变量(请替换为您的真实信息) # 作业提交匿名密钥:secret.jwt.anonKey ANON_KEY="<YOUR_ANON_KEY>" # 3. 提交作业 # 提交任务,并等待执行结果 ray job submit --headers "{\"Authorization\": \"Bearer $ANON_KEY\"}" --working-dir ./src -- python script.py # 提交任务,不等待执行结果 ray job submit --headers "{\"Authorization\": \"Bearer $ANON_KEY\"}" --no-wait --working-dir ./src -- python script.py示例结果如下:
requests.exceptions.HTTPError: 401 Client Error: Unauthorized for url: http://xxx/api/version (base) jovyan@pa-xxx:~/work$ ANON_KEY="exxx" Yxxx (base) jovyan@pa-2xxx:~/work$ ray job submit --headers "{\"Authorization\": \"Bearer $ANON_KEY\"}" --no-wait --working-dir ./src -- python script.py Job submission server address: http://1xxx:8265 2025-12-18 03:47:06,460 INFO dashboard_sdk.py:338 -- Uploading package gcs://_ray_pkg_xxx.zip. 2025-12-18 03:47:06,461 INFO packaging.py:576 -- Creating a file package for local module './src'. ------------------------------------------------------- Job 'raysubmit_Jxxx' submitted successfully ------------------------------------------------------- Next steps Query the logs of the job: ray job logs raysubmit_Jxxx Query the status of the job: ray job status raysubmit_xxx Request the job to be stopped: ray job stop raysubmit_Jxxx (base) jovyan@pa-xxx:~/work$
-
在Jupyter Notebook中交互式执行作业
对于需要逐行执行代码、进行探索性分析和调试的场景,Jupyter Notebook是理想的选择。
-
登录并新建Notebook:
-
在浏览器中打开已获取的Jupyter公网地址。
-
在密码框(Password or token)中输入
secret.jupyterlab.password对应的密钥,进入JupyterLab界面。 -
通过打开一个新建Notebook、Console或Python File。Launcher 页面提供三种 Python 开发入口:Notebook 区块的 Python 3 (ipykernel) 用于创建交互式笔记本,Console 区块的 Python 3 (ipykernel) 用于打开交互式控制台,Other 区块的 Python File 用于创建 Python 脚本文件。
-
-
运行代码:以Notebook为例,在代码单元格(Cell)中输入并执行以下代码。
import ray import time @ray.remote def retrieve_task(item, db): time.sleep(item / 10.) return item, db[item] if __name__ == "__main__": database = [ "Learning", "Ray", "Flexible", "Distributed", "Python", "for", "Machine", "Learning" ] ray.init() db_object_ref = ray.put(database) retrieve_refs = [ retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6] ] result = [print(data) for data in ray.get(retrieve_refs)]运行结果如下:
(0, 'Learning') (2, 'Flexible') (4, 'Python') (6, 'Machine')
使用Ray Python SDK提交作业
当您需要在本地开发环境、现有Python应用或CI/CD流水线中以编程方式提交作业时,推荐使用Ray Python SDK。
-
准备环境
说明请先在您的本地环境中安装Python 3环境。
安装Ray SDK:
pip install ray -
准备代码文件
您需要在您的工作目录下,新建两个Ray应用脚本。一个用于业务处理,一个用于提交Job。工作目录结构示例如下:- working-dir/ - script.py # 您的Ray业务逻辑脚本 - ray_job_submit.py # 用于提交作业的脚本script.pyimport ray import time @ray.remote def retrieve_task(item, db): time.sleep(item / 10.) return item, db[item] if __name__ == "__main__": database = [ "Learning", "Ray", "Flexible", "Distributed", "Python", "for", "Machine", "Learning" ] ray.init() db_object_ref = ray.put(database) retrieve_refs = [ retrieve_task.remote(item, db_object_ref) for item in [0, 2, 4, 6] ] result = [print(data) for data in ray.get(retrieve_refs)]ray_job_submit.pyimport os import json import base64 import hmac import hashlib import requests from datetime import timezone, timedelta, datetime from requests.auth import HTTPBasicAuth from ray.job_submission import JobSubmissionClient # 您的 Ray Dashboard 地址 address = os.getenv("RAY_DASHBOARD_ADDRESS") # 您可以直接使用 `secret.jwt.anonKey` 进行认证 token = os.getenv("RAY_ANON_TOKEN") # 您也可以使用 `secret.jwt.secret` 生成 JWT secret = os.getenv("RAY_JWT_SECRET") header = {"alg": "HS256", "typ": "JWT"} now = datetime.now(tz=timezone.utc) payload = { "sub": "anon", "iat": int(now.timestamp()), "exp": int((now + timedelta(hours=12)).timestamp()), "iss": "ray-dashboard", "aud": "ray-dashboard-client" } header_encoded = base64.urlsafe_b64encode( json.dumps(header, separators=(',', ':')).encode()).decode('utf-8').rstrip('=') payload_encoded = base64.urlsafe_b64encode( json.dumps(payload, separators=(',', ':')).encode()).decode('utf-8').rstrip('=') message = f"{header_encoded}.{payload_encoded}" signature = hmac.new(secret.encode(), message.encode(), hashlib.sha256).digest() signature_encoded = base64.urlsafe_b64encode(signature).decode('utf-8').rstrip( '=') token = f"{message}.{signature_encoded}" # 您也可以通过POST /api/auth/token 获取 JWT 进行认证,您需要提供用户名和密码 username = os.getenv('RAY_USERNAME') password = os.getenv('RAY_PASSWORD') response = requests.post(f'{address}/api/auth/token', auth=HTTPBasicAuth(username, password), timeout=5) if response.status_code == 200: token_data = response.json() token = token_data['access_token'] else: print(f"Failed to get JWT: {response.status_code}") client = JobSubmissionClient(address, headers={"Authorization": f"Bearer {token}"}) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python script.py", # Path to the local directory that contains the script.py file runtime_env={"working_dir": "./"}) print(job_id) -
配置并运行
-
在Ray应用配置信息页面查看以下信息,并将其设置为环境变量或在
submit_job.py脚本中进行替换变量
对应配置项
示例值
RAY_DASHBOARD_ADDRESSRay应用的连接地址,根据您的实际环境选择公网或私网。
http://123.xxx.xxx.xxx:8265RAY_ANON_TOKENsecret.jwt.anonKeyeyJhbGciOi...RAY_JWT_SECRETsecret.jwt.secrettNhVxysSRD...RAY_USERNAMEsecret.dashboard.usernameadminRAY_PASSWORDsecret.dashboard.passwordUTLof$rMVM... -
在您的工作目录下,执行提交脚本:
python ray_job_submit.py -
执行成功后,您可以登录Ray Dashboard,在Jobs页面查看已提交作业的运行状态。任务记录显示 Entrypoint 为
python script.py,Status 为 SUCCEEDED,Status message 为Job finished successfully,表明任务已成功完成。
-
使用Ray CLI从本地或ECS实例提交作业
此方法与在JupyterLab Terminal中操作类似,但需要在您自己的环境中执行,适用于习惯使用命令行的开发者。
-
确保满足前提条件
-
网络:确认您的开发环境IP地址已在应用的白名单或安全组中,且能访问Ray Dashboard地址。
-
安装Ray:执行
pip install ray。
-
-
提交作业:在您的本地终端或ECS Terminal中,使用
ray job submit命令提交作业。所有命令都需通过--address参数明确指定Ray Dashboard地址。# 设置环境变量(请替换为您的真实信息) # Ray应用的Dashboard地址,需添加http://前缀 RAY_ADDRESS="<YOUR_DASHBOARD_URL>" # 作业提交匿名密钥:secret.jwt.anonKey ANON_KEY="<YOUR_ANON_KEY>" # 提交作业 ray job submit --address "$RAY_ADDRESS" --headers "{\"Authorization\": \"Bearer $ANON_KEY\"}" --working-dir <工作目录> -- python script.py
使用REST API提交作业
此方法仅适用于执行单行简单命令,无法自动上传本地Python脚本或代码目录。对于绝大多数场景,推荐使用功能更强大、更便捷的Ray Python SDK或Ray CLI。
-
准备请求信息:获取Ray Dashboard和匿名密钥
secret.jwt.anonKey。 -
发送POST请求:使用
curl或任何编程语言的HTTP客户端发送请求。此处以Python为例:import requests import json # 您的 Ray Dashboard 地址,需添加http://前缀 RAY_ADDRESS = "<YOUR_DASHBOARD_URL>" # 您的匿名密钥 RAY_ANON_KEY = "<YOUR_ANON_KEY>" headers = { 'Content-Type': 'application/json', "Authorization": f"Bearer {RAY_ANON_KEY}" } # entrypoint 只能是简单的单行命令 payload = { "entrypoint": "python -c \"import ray; ray.init(); print(ray.nodes())\"", } # API 端点末尾必须有斜杠 "/" response = requests.post( f"{RAY_ADDRESS}/api/jobs/", json=payload, headers=headers ) response.raise_for_status() job_info = response.json() print(f"作业提交成功: {job_info}")运行结果如下:
作业提交成功: {'job_id': 'raysubmit_xxx', 'submission_id': 'raysubmit_xxx'}