使用Ray集群

更新时间:
复制为 MD 格式

Ray集群是EMR Serverless Spark工作空间提供的分布式计算框架,支持Python原生分布式计算、机器学习模型训练与推理等场景。本文介绍如何创建、启动Ray集群,以及如何提交Ray作业。

前提条件

当前Ray集群为白名单功能,使用前需提交工单,提供工作空间ID和地域(Region)信息,申请开通Ray集群使用权限。

创建Ray集群

  1. 登录EMR Serverless Spark控制台,进入目标工作空间。

  2. 在左侧导航栏,单击集群管理,然后单击RAY集群页签。

  3. 单击创建Ray集群,在创建面板中配置以下参数,然后单击创建

    • 集群名称:输入集群名称。

    • 引擎版本:选择引擎版本。当前提供默认版本err-1.0.1 (Ray 2.47.1, Python 3.12)。

    • 节点组:单击+添加Worker节点组,配置以下信息。

      • 节点组名称:输入节点组名称,不同节点组的名称不能重复。

      • 资源队列:选择资源队列。

      • 资源规格:选择节点规格。

      • 节点数量:设置Worker节点数量。

      • (可选)弹性伸缩:配置弹性伸缩策略,按需自动扩缩Worker节点数量。

      • (可选)Worker节点自动终止:开启后,闲置的弹性创建的Worker节点将被自动销毁。

    • (可选)网络连接:如需从Ray集群访问同VPC内的服务或公网,选择已创建的网络连接。

    • (可选)纳管文件:如需在Ray集群中挂载目录,在左侧导航栏单击文件管理 > 纳管文件中添加,支持挂载OSSNAS。详情请参见OSS挂载配置(纳管文件)

    • (可选)集群高级配置:以JSON格式配置高级参数,详情请参见集群高级配置参数说明

说明

当前控制台暂不支持配置Worker节点自动伸缩,如需配置,可通过API方式修改,详情请参见CreateRayCluster - 创建Ray集群

OSS挂载配置(纳管文件)

通过文件管理 > 纳管文件添加挂载文件时,支持配置以下挂载参数。

  • authType:挂载OSS时的鉴权方式。取值如下:

    • provider(默认):使用工作空间角色进行鉴权。

    • AK:使用AccessKey进行鉴权,需同时配置accessKeyIdaccessKeySecret

  • accessKeyId:当authTypeAK时,填写AccessKey ID。

  • accessKeySecret:当authTypeAK时,填写AccessKey Secret。

  • 其他参数:挂载模式配置(字符串格式),fs.jindo.args会被作为挂载启动参数,除此以外的参数会被写入到jindo配置文件中。支持以下挂载模式:

    • 快速读写(默认):读写速度快,但并发读写时可能存在数据不一致问题,适合挂载训练数据集和模型文件,不适合作为工作目录。

      {
        "fs.oss.download.thread.concurrency": "cpu核数2倍",
        "fs.oss.upload.thread.concurrency": "cpu核数2倍",
        "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
      }
    • 增量读写:增量写入时能保证数据一致性,覆盖已有数据时可能存在一致性问题,读取速度略慢,适合保存模型训练过程中的权重文件。

      {
        "fs.oss.upload.thread.concurrency": "cpu核数2倍",
        "fs.jindo.args": "-oattr_timeout=3 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
      }
    • 读写一致:并发读写时能保持数据一致性,读取速度较慢,适合对数据一致性要求高的场景,如保存代码项目。

      {
        "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
      }
    • 只读:仅允许读取,不允许写入,适合挂载公共数据集。

      {
        "fs.oss.download.thread.concurrency": "cpu核数2倍",
        "fs.jindo.args": "-oro -oattr_timeout=7200 -oentry_timeout=7200 -onegative_timeout=7200 -okernel_cache -ono_symlink"
      }

纳管文件高级配置示例

  • 挂载时对读写一致要求高,使用工作空间角色鉴权:

    {
      "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
    }
  • 挂载时对读写一致要求高,且使用自定义AccessKey鉴权:

    {
      "authType": "AK",
      "accessKeyId": "LTxxxxxxxxx",
      "accessKeySecret": "xxxxxxxxxxxxxx",
      "fs.jindo.args": "-oattr_timeout=0 -oentry_timeout=0 -onegative_timeout=0 -oauto_cache -ono_symlink"
    }

集群高级配置参数说明

集群高级配置以JSON格式填写,当前支持以下参数。

  • userDefinedFiles:指定在集群启动时需要下载到Head节点和Worker节点的OSS文件。支持OSSOSS-HDFS路径,多个路径之间用英文逗号(,)分隔。文件下载至各节点的/home/ray/work-dir目录下。示例:oss://mybucket/hello.py,oss://mybucket2/test/test.jar

  • userRequirementsFile:指定用于初始化Head节点和Worker节点Python基础环境的requirements.txt文件。支持OSSOSS-HDFS路径,路径格式必须为oss://<bucket>/<path>/requirements.txt。Ray集群启动后,系统自动执行pip install -r requirements.txt命令(异步执行,不影响集群启动)。

  • gpuDriverVersion(仅GPU机型支持):指定GPU驱动版本。取值如下:

    • tesla=470:Tesla 470,CUDA 11.4。

    • tesla=535:Tesla 535,CUDA 12.2。

    • tesla=550(默认):Tesla 550,CUDA 12.4。

高级配置示例

以下示例配置了启动时下载OSS文件和初始化Python环境:

{
  "userDefinedFiles": "oss://mybucket/hello.py,oss://mybucket2/test/test.jar",
  "userRequirementsFile": "oss://mybucket/env/requirements.txt"
}

启动Ray集群

  1. Ray集群列表中,找到目标集群,单击启动

  2. 等待集群运行状态变为运行中

    说明

    工作空间内首次启动Ray集群时,系统需要创建额外组件,启动时间约需2~3分钟;后续启动速度会明显加快。

  3. 集群启动后,单击调用信息,记录提交Ray作业所需的地址和Token信息。

    警告

    当前版本中,Token对同一Ray集群永久有效,请妥善保管,避免泄露。

    单击Dashboard,可进入Ray集群的监控界面,查看集群资源使用情况。

    单击指标大盘,可查看Ray集群的Metrics监控信息。

提交Ray作业

方式一:代码提交(批任务)

本地环境要求:Python 3.12。

  1. 安装Ray Job Submission客户端库:

    pip install "ray[default]==2.47.1"
  2. 在集群的调用信息页面,获取公网调用地址Token

  3. 使用以下示例代码连接Ray集群并提交作业:

    import time
    from ray.job_submission import JobSubmissionClient
    
    custom_headers = {
        "ray-token": "<yourToken>"
    }
    
    client = JobSubmissionClient("<公网调用地址>", headers=custom_headers)
    
    # 或者用内网,需要保证提交端在同一region vpc下,建议内网提交,更加稳定
    # client = JobSubmissionClient("http://emr-spark-ray-gateway-cn-beijing-internal.spark.emr.aliyuncs.com", headers=custom_headers)
    
    job_id = client.submit_job(
        entrypoint="python -c 'print(\"Hello from Ray Client!\")'"
    )
    
    print(f"Submitted job with ID: {job_id}")
    
    while True:
        status = client.get_job_status(job_id)
        print(f"Job status: {status}")
        if status.is_terminal():
            break
        time.sleep(1)
    
    logs = client.get_job_logs(job_id)
    print("Job logs:")
    print(logs) 

    image

方式二:命令行提交(批任务)

本地环境要求:Python 3.12。

  1. 安装Ray Job Submission客户端库:

    pip install "ray[default]==2.47.1"
  2. 在集群的调用信息页面,获取公网调用地址Token

  3. 执行以下命令提交Ray作业:

    # ray job submit前必须设置ray addressheaders
    export RAY_ADDRESS='<公网调用地址>'               
    export RAY_JOB_HEADERS='{"ray-token": "<yourToken>"}'
    
    ## 提交任务
    ### working dir 支持本地 & Ray core distrubuted sort
    ray job submit --working-dir "." -- python  test-ray-core-sort.py
    
    ### 支持OSS读写
    ray job submit --working-dir "." -- python test-saving-data-test.py
    
    ### 支持OSS-HDFS读写
    ray job submit --working-dir "." -- python test-saving-data-oss-hdfs-test.py
    
    ### 代码支持Mount读写
    ray job submit --working-dir "." -- python  test-saving-data-test-mount.py

更多命令行参数,请参见Ray Job Submission CLI官方文档

方式三:交互式开发

Ray集群相同地域的VPC内访问,且环境为Python 3.12。

  1. 安装Ray客户端:

    pip install ray[client]==2.47.1
  2. 在集群的调用信息页面,获取gRPC 地址Token

  3. 使用以下示例代码连接Ray集群,进行交互式开发:

    import ray
    import os
    
    def get_metadata():
        headers = {"ray-token": "<yourToken>"}
        return [(key.lower(), value) for key, value in headers.items()]
    
    ray.init(address="<your_gRPC_address>", _metadata=get_metadata())
    
    import time
    @ray.remote
    def square(x):
        time.sleep(0.1)
        return x * x
    
    futures = [square.remote(i) for i in range(10)]
    
    results = ray.get(futures)
    print("平方结果:", results)

    image

高级用法

Kerberos身份认证

EMR Serverless Ray支持以Kerberos身份访问OSSOSS-HDFS。

前提条件

选择err-1.1.0及以上引擎版本。

操作步骤

  1. EMR Serverless Spark工作空间的安全中心开启Kerberos身份认证,绑定krb5.conf文件。krb5.conf文件可从EMR集群的/etc目录中获取。配置示例如下:

    [logging]
      default = FILE:/mnt/disk1/log/kerberos/krb5libs.log
      kdc = FILE:/mnt/disk1/log/kerberos/krb5kdc.log
      admin_server = FILE:/mnt/disk1/log/kerberos/kadmind.log
    
    [libdefaults]
      default_realm = EMR.C-XXX.COM
      dns_lookup_realm = false
      dns_lookup_kdc = false
      ticket_lifetime = 24h
      renew_lifetime = 7d
      forwardable = true
      rdns = false
      dns_canonicalize_hostname = true
      pkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crt
      kdc_timeout = 30s
      max_retries = 3
    
    [realms]
      EMR.XXX.COM = {
        kdc = master-1-1.c-xxx.cn-hangzhou.emr.aliyuncs.com:88
        admin_server = master-1-1.c-xxx.cn-hangzhou.emr.aliyuncs.com:749
      }
  2. 绑定core-site.xml文件。在EMR半托管集群开启Kerberos + Ranger + OSS/DLS权限后,拷贝Hadoop core-site.xml文件,绑定到配置管理 > 自定义配置文件 > Ray中。配置路径为/etc/ray/conf,文件名称为core-site.xml。文件内容需做以下修改:

    <property>
        <name>fs.oss.jindoauth.rpc.address</name>
        <value>[master真实域名或IP]:8201</value>
    </property>
    
    <property>
        <name>fs.dls.jindoauth.rpc.address</name>
        <value>[master真实域名或IP]:8201</value>
    </property>
    
    <property>
        <name>fs.jdo.plugin.dir</name>
        <value>/opt/apps/JINDOSDK/jindosdk-current/plugins</value>
    </property>
  3. 启动新的Ray集群。

  4. 使用合法的PrincipalKeytab文件,放在代码目录,并通过Ray CLI提交作业:

    ray job submit --runtime-env-json='{"env_vars": {"KERBEROS_PRINCIPAL": "test@EMR.XXX.COM", "KERBEROS_KEYTAB": "test.keytab"}}' --working-dir "." -- python test.py

读写DLF(Paimon表)

EMR Serverless Ray支持通过PyPaimon读写DLF中的Paimon表。

说明
  • Ray集群中需要已安装pypaimon(pypaimon依赖pyarrow)。EMR提供的镜像Python环境已预安装pypaimon。

  • 仅支持VPC网络访问,不支持公网。

  • DLF服务接入点信息,请参见DLF服务接入点

以下示例代码演示如何通过Ray读写DLF中的Paimon表:

import ray
import pandas as pd

data = [
    (10, 'pic10.jpg'),
    (11, 'pic11.png'),
    (12, 'pic12.gif')
]

# 转换为DataFrame
df = pd.DataFrame(data, columns=['id', 'pic'])
ds = ray.data.from_pandas(df)

# 写入Paimon表(仅支持VPC网络)
ds.write_paimon(
    table_identifier="default.my_table",
    catalog_kwargs={
        'metastore': 'rest',
        'warehouse': '<DLF Catalogs名称>',
        'uri': 'http://<region>-vpc.dlf.aliyuncs.com',
        'dlf.region': '<region>'
    }
)

# 读取Paimon表
ds1 = ray.data.read_paimon(
    table_identifier="default.my_table",
    catalog_kwargs={
        'metastore': 'rest',
        'warehouse': '<DLF Catalogs名称>',
        'uri': 'http://<region>-vpc.dlf.aliyuncs.com',
        'dlf.region': '<region>'
    }
)

records = ds1.take_all()
for i, record in enumerate(records, 1):
    print(f"记录 {i}: {record}")

如果遇到权限问题,请参见DLF无权限问题排查

读写Paimon File System

以下示例代码演示如何通过Ray读取OSS上的Paimon表:

import ray

# 读取Paimon表
ds = ray.data.read_paimon(
    table_identifier="default.calculate_tbl",
    catalog_kwargs={
        "warehouse": "oss://<bucket>/paimon_warehouse",
        "fs.oss.endpoint": "oss-<region>.aliyuncs.com",
        "fs.oss.region": "<region>"
    }
)

records = ds.take_all()
for i, record in enumerate(records, 1):
    print(f"记录 {i}: {record}")