使用Ray对Lance数据进行维护

更新时间:
复制为 MD 格式

当您需要对保存在OSS上的Lance格式数据执行compact(合并小文件)、创建标量索引、创建向量索引等维护操作时,可以通过Ray应用提交Python作业,利用Ray的分布式调度能力对多个Lance dataset并行执行维护任务。本文介绍如何在Ray应用中提交Lance数据维护作业。

适用范围

  • 已根据创建Ray应用Ray应用配置创建了一个可供公网访问的Ray应用

  • Lance数据文件已保存在OSS上,且Ray应用OSS Bucket位于同一地域。

  • 已获取访问OSS所需的AccessKey IDAccessKey Secret。

说明
  • Ray Head组件和Worker组件的内存规格请至少保持16 GB,建议购买216 GB规格来进行试用,防止大任务出现OOM。

  • Ray应用需要与OSS Bucket位于同一地域,以确保可通过内网地址进行访问。

  • 若通过公网访问地址访问不通Ray应用时,请检查是否已正确配置IP白名单。

背景信息

Ray、LanceOSS的关系

假设您的Lance文件保存在OSS上,那么Ray、LanceOSS三者的关系如下:

组件

作用

Ray

负责分布式调度。Driver提交多个远程任务,Ray应用把任务分配到不同Worker上运行。

Lance

负责数据格式和维护API。脚本通过Lance SDK读写dataset、执行compact、创建索引和重建索引。

OSS

负责对象存储。Lance dataset的数据文件、版本元数据和索引文件都保存在OSS路径下。

重要

Ray的并行适合用于多个独立datasetshard并行维护。同一个Lance dataset的版本提交建议由一个任务负责,不建议多个Ray Task同时对同一个datasetcompact或建索引提交,避免产生版本冲突。

支持的维护操作

本文示例脚本包含以下Lance数据维护操作:

  • compact:合并小fragment,降低后续扫描和读取放大。

  • create scalar index:为结构化列创建标量索引,例如为id列创建BTREE索引。

  • append:向已有Lance dataset追加新数据。

  • reindex:append之后重建同名索引,让索引覆盖新增数据。

  • create vector index:为向量列创建向量索引,例如IVF_PQ

示例脚本会把多个shard拆成多个独立Lance dataset,并通过Ray多个Worker并行执行维护任务。您可以通过输出中的task_hostnames判断任务是否被调度到了多个节点。

操作步骤

以下步骤介绍如何通过JupyterLab Terminal提交Lance数据维护作业。JupyterLab TerminalRay应用提供的预置环境,已内ray job submit命令,无需在本地安装任何依赖。

说明

使用JupyterLab Terminal提交作业时,还需将Ray应用所属集群的VPCIPv4网段添加至应用白名单中。

  1. 登录JupyterLab

    1. 在浏览器中打开已获取的Jupyter公网地址。

    2. 在密码框(Password or token)中输入secret.jupyterlab.password对应的密钥,进入JupyterLab界面。

    image

  2. 准备Python脚本

    JupyterLab的文件浏览器中,创建一个工作目录(例如src)并双击进入工作目录,创建一个Python脚本文件(例如oss_ray_run.py)。imageimage

    将以下完整代码复制到脚本文件中。该脚本会将一个根目录拆成多个shard,每个shard是一个独立的Lance dataset,Ray为每个shard提交一个远程Task,从而实现多Worker并行维护。

    单击展开查看示例Python脚本

    import os
    import socket
    from typing import Any
    
    import lance
    import numpy as np
    import pyarrow as pa
    import ray
    
    
    # 本示例把一个根目录拆成多个 shard,每个 shard 是一个独立的 Lance dataset。
    # Ray 会为每个 shard 提交一个远程 task,从而让多个 worker 并行维护多个 dataset。
    SHARDS = int(os.environ.get("LANCE_DEMO_SHARDS", "4"))
    
    # 每个 shard 初始写入的行数。
    ROWS = 2048
    
    # append 阶段追加的行数,用于演示已有索引在新增数据后需要 reindex。
    APPEND_ROWS = 256
    
    # 向量列维度。后面会在 vector 列上创建 IVF_PQ 向量索引。
    VECTOR_DIM = 8
    
    # 写入时每个文件最多写 128 行。这个值故意设置得较小,用于制造多个小 fragment,
    # 这样后面的 compact 效果更容易观察。
    ROWS_PER_FILE = 128
    
    
    def storage_options() -> dict[str, str]:
        """把 runtime_env.yaml 中的 OSS 环境变量转换为 Lance 能识别的参数。"""
        options = {
            # OSS endpoint。生产环境建议使用和 Ray 集群同 Region 的内网 endpoint。
            "oss_endpoint": os.environ["OSS_ENDPOINT"],
            # OSS 所在 Region,例如 cn-beijing。
            "oss_region": os.environ.get("OSS_REGION", "cn-hangzhou"),
            # OSS AccessKey ID。
            "oss_access_key_id": os.environ["OSS_ACCESS_KEY_ID"],
            # OSS AccessKey Secret。
            "oss_secret_access_key": os.environ["OSS_ACCESS_KEY_SECRET"],
        }
    
        # 如果使用 STS 临时凭证,还需要传入安全令牌。
        if os.environ.get("OSS_SECURITY_TOKEN"):
            options["oss_security_token"] = os.environ["OSS_SECURITY_TOKEN"]
        return options
    
    
    def make_table(shard: int, start: int, rows: int, dim: int = VECTOR_DIM) -> pa.Table:
        """构造一批演示数据,并返回 Lance 可以写入的 Arrow Table。"""
        rng = np.random.default_rng(20260506 + shard * 100000 + start)
    
        ids = np.arange(start, start + rows, dtype=np.int64)
        categories = np.array([f"shard-{shard:02d}" for _ in range(rows)])
        payload = np.array([f"doc-{shard:02d}-{i:06d}" for i in range(start, start + rows)])
    
        vectors = rng.random((rows, dim), dtype=np.float32)
        vector_values = pa.array(vectors.reshape(-1), type=pa.float32())
        vector_array = pa.FixedSizeListArray.from_arrays(vector_values, dim)
    
        return pa.table(
            {
                "id": pa.array(ids),
                "category": pa.array(categories),
                "payload": pa.array(payload),
                "vector": vector_array,
            }
        )
    
    
    def index_name(index: Any) -> str:
        """兼容不同 Lance 版本里 list_indices() 可能返回 dict 或对象的情况。"""
        if isinstance(index, dict):
            return str(index.get("name"))
        return str(getattr(index, "name", index))
    
    
    def dataset_stats(uri: str) -> dict[str, Any]:
        """读取一个 Lance dataset 的核心状态,用于执行前后对比。"""
        ds = lance.dataset(uri, storage_options=storage_options())
        return {
            "version": ds.version,
            "rows": ds.count_rows(),
            "fragments": len(ds.get_fragments()),
            "indices": [index_name(index) for index in ds.list_indices()],
        }
    
    
    @ray.remote(num_cpus=1)
    def maintain_one_shard(root_uri: str, shard: int) -> dict[str, Any]:
        """维护一个独立 shard:写入、compact、建索引、append、reindex、建向量索引。"""
        uri = f"{root_uri.rstrip('/')}/shard={shard:02d}.lance"
    
        node = {
            "node_id": ray.get_runtime_context().get_node_id(),
            "hostname": socket.gethostname(),
        }
    
        # 初始写入 dataset
        lance.write_dataset(
            make_table(shard, 0, ROWS),
            uri,
            mode="overwrite",
            storage_options=storage_options(),
            max_rows_per_file=ROWS_PER_FILE,
            max_rows_per_group=ROWS_PER_FILE,
        )
        initial = dataset_stats(uri)
    
        # compact
        ds = lance.dataset(uri, storage_options=storage_options())
        ds.optimize.compact_files(
            target_rows_per_fragment=512,
            max_rows_per_group=512,
            materialize_deletions=True,
            num_threads=2,
        )
        after_compact = dataset_stats(uri)
    
        # create scalar index
        ds = lance.dataset(uri, storage_options=storage_options())
        ds.create_scalar_index("id", "BTREE", name="idx_id", replace=False)
        after_scalar_index = dataset_stats(uri)
    
        # append
        lance.write_dataset(
            make_table(shard, ROWS, APPEND_ROWS),
            uri,
            mode="append",
            storage_options=storage_options(),
            max_rows_per_file=ROWS_PER_FILE,
            max_rows_per_group=ROWS_PER_FILE,
        )
    
        # reindex scalar index
        ds = lance.dataset(uri, storage_options=storage_options())
        ds.create_scalar_index("id", "BTREE", name="idx_id", replace=True)
        scalar_index_stats = lance.dataset(uri, storage_options=storage_options()).index_statistics("idx_id")
    
        # create vector index
        ds = lance.dataset(uri, storage_options=storage_options())
        ds.create_index(
            "vector",
            "IVF_PQ",
            name="idx_vector",
            metric="cosine",
            replace=True,
            num_partitions=16,
            num_sub_vectors=2,
        )
        final = dataset_stats(uri)
    
        # 结果校验
        assert after_compact["rows"] == initial["rows"], (initial, after_compact)
        assert after_compact["fragments"] < initial["fragments"], (initial, after_compact)
        assert "idx_id" in after_scalar_index["indices"], after_scalar_index
        assert scalar_index_stats["num_unindexed_rows"] == 0, scalar_index_stats
        assert "idx_vector" in final["indices"], final
    
        return {
            "shard": shard,
            "node": node,
            "initial": initial,
            "after_compact": after_compact,
            "after_scalar_index": after_scalar_index,
            "scalar_index_stats": scalar_index_stats,
            "final": final,
        }
    
    
    def main() -> None:
        root_uri = os.environ["LANCE_ROOT_URI"]
    
        ray.init(address=os.environ.get("RAY_ADDRESS", "auto"))
    
        live_nodes = [node for node in ray.nodes() if node.get("Alive")]
        print("ray_live_nodes =", len(live_nodes))
        print("ray_cluster_resources =", ray.cluster_resources())
    
        refs = [maintain_one_shard.remote(root_uri, shard) for shard in range(SHARDS)]
        results = ray.get(refs)
    
        hostnames = sorted({result["node"]["hostname"] for result in results})
        print("task_hostnames =", hostnames)
    
        for result in sorted(results, key=lambda item: item["shard"]):
            print("----- shard", result["shard"], "-----")
            print("node =", result["node"])
            print("initial =", result["initial"])
            print("after_compact =", result["after_compact"])
            print("after_scalar_index =", result["after_scalar_index"])
            print("scalar_index_stats =", result["scalar_index_stats"])
            print("final =", result["final"])
    
        if len(hostnames) < 2:
            print("WARN_SINGLE_NODE_TASKS: tasks only ran on one hostname.")
            print("Increase Ray worker count or LANCE_DEMO_SHARDS if you need visible multi-node scheduling.")
    
        print("VERIFY_OK")
    
    
    if __name__ == "__main__":
        main()
  3. 配置OSS环境变量

    由于Ray HeadWorker组件内无法直接使用本地电脑上的环境变量,需要将环境变量写成文件,通过--runtime-env参数提交给Ray应用

    在工作目录下创建runtime_env.yaml文件,填入以下内容:

    env_vars:
      OSS_ENDPOINT: "https://oss-cn-beijing-internal.aliyuncs.com"
      OSS_REGION: "cn-beijing"
      OSS_ACCESS_KEY_ID: "XXXX"
      OSS_ACCESS_KEY_SECRET: "XXXX"
      LANCE_ROOT_URI: "oss://BUCKET/DATASET"

    环境变量说明如下:

    变量

    示例

    说明

    OSS_ENDPOINT

    https://oss-cn-beijing-internal.aliyuncs.com

    OSS访问地址。Ray应用OSS在同一地域时,建议使用VPC内网访问地址,减少公网访问成本和延迟。

    OSS_REGION

    cn-beijing

    OSS Bucket所在地域,需要和Endpoint匹配。

    OSS_ACCESS_KEY_ID

    XXXX

    访问OSSAccessKey ID。生产环境建议使用最小权限账号或STS临时凭证。

    OSS_ACCESS_KEY_SECRET

    XXXX

    访问OSSAccessKey Secret。不要把真实密钥提交到代码仓库。

    OSS_SECURITY_TOKEN

    XXXX

    (可选)使用STS临时凭证时需要配置。

    LANCE_ROOT_URI

    oss://BUCKET/DATASET

    Lance dataset根路径。BUCKETOSS Bucket名称,DATASETLance dataset的目录前缀。脚本会在该路径下创建多个shard=xx.lance子目录。

  4. 提交作业并查看结果

    1. JupyterLab中,通过File > New Launcher打开一个新的Terminalimage

    2. JupyterLabTerminal中执行以下命令,提交作业并查看日志。

      # 1. 返回上层目录,为演示提交作业中的 --working-dir 参数。
      cd ..
      
      # 2. 设置环境变量。请替换为您的真实信息。
      # 作业提交匿名密钥:secret.jwt.anonKey
      ANON_KEY="<YOUR_ANON_KEY>"
      
      # 3. 提交任务,并等待执行结果。
      ray job submit --headers "{\"Authorization\": \"Bearer $ANON_KEY\"}" --working-dir ./src --runtime-env ./src/runtime_env.yaml -- python oss_ray_run.py

      作业执行成功后,输出中会包含以下关键信息:image

      • task_hostnames:显示任务实际运行的节点。如果出现多个hostname,说明任务已经分散到多个Ray节点。

      • 每个shardinitialafter_compact对比:可以看到compactfragment数量下降,但行数不变。

      • VERIFY_OK:表示所有维护操作和校验均通过。

      说明

      如果输出中出现WARN_SINGLE_NODE_TASKS警告,表示所有任务只落在了一个节点上。常见原因是Worker数不足、LANCE_DEMO_SHARDS设置过小或每个Task资源需求较低。可以增加Ray Worker数量或调大LANCE_DEMO_SHARDS环境变量的值来实现多节点调度。

代码块说明

以下分别说明脚本中各Lance维护操作对应的核心代码及含义。

compact

compact操作将多个小fragment合并成更少的fragment,减少后续读取开销。

ds = lance.dataset(uri, storage_options=storage_options())
ds.optimize.compact_files(
    target_rows_per_fragment=512,
    max_rows_per_group=512,
    materialize_deletions=True,
    num_threads=2,
)
  • target_rows_per_fragment=512:控制compact后每个fragment的目标行数。

  • num_threads=2:Lance在当前Ray Worker进程内部使用的线程数,不是RayWorker数量。

说明

num_threads不是RayWorker数。假设有4Ray Task,每个Tasknum_threads=2,则最多出现4dataset并行维护,每个dataset内部再使用2个本地线程做compact。

create scalar index

为结构化列创建标量索引。首次创建使用replace=False,append数据后重建索引使用replace=True

  • 首次创建标量索引:

    ds = lance.dataset(uri, storage_options=storage_options())
    ds.create_scalar_index("id", "BTREE", name="idx_id", replace=False)
  • append之后重建同名索引(reindex):

    ds = lance.dataset(uri, storage_options=storage_options())
    ds.create_scalar_index("id", "BTREE", name="idx_id", replace=True)
    scalar_index_stats = lance.dataset(uri, storage_options=storage_options()).index_statistics("idx_id")

    参数说明

    • "id":要建索引的列名。

    • "BTREE":索引类型,适合数值、字符串等结构化列的过滤查询。

    • replace=False:如果同名索引已存在则不覆盖,适合首次创建。

    • replace=True:重建同名索引,即reindex操作。

    • index_statistics("idx_id"):用于检查索引覆盖情况,num_unindexed_rows == 0表示索引已覆盖全部行。

create vector index

为向量列创建向量索引,用于近似最近邻(ANN)搜索。

ds = lance.dataset(uri, storage_options=storage_options())
ds.create_index(
    "vector",
    "IVF_PQ",
    name="idx_vector",
    metric="cosine",
    replace=True,
    num_partitions=16,
    num_sub_vectors=2,
)

参数说明

  • "vector":向量列名。

  • "IVF_PQ":向量索引类型。

  • metric="cosine":使用cosine距离。

  • num_partitions=16:IVF分区数。

  • num_sub_vectors=2:PQ子向量数,需要和向量维度匹配。

  • replace=True:适合Demo或重建索引。生产环境应结合版本和任务状态谨慎使用。

RayWorker并行维护的实践与约束

Ray并行维护多个dataset的核心代码如下:

refs = [maintain_one_shard.remote(root_uri, shard) for shard in range(SHARDS)]
results = ray.get(refs)

推荐实践

每个Ray Task负责一个独立的datasetshard,同一个dataset内按顺序执行维护操作:

Ray task 1 -> compact oss://BUCKET/DATASET/shard=00.lance
Ray task 2 -> compact oss://BUCKET/DATASET/shard=01.lance
Ray task 3 -> compact oss://BUCKET/DATASET/shard=02.lance
Ray task 4 -> compact oss://BUCKET/DATASET/shard=03.lance
  • 一个Ray Task负责一个独立datasetshard。

  • 同一个dataset内按顺序执行compact > create index > append > reindex > verify

  • 多个dataset之间交给Ray并行调度。

不推荐实践

不要让多个Ray Task同时对同一个dataset执行不同的维护操作:

Ray task 1 -> compact oss://BUCKET/DATASET/main.lance
Ray task 2 -> create index oss://BUCKET/DATASET/main.lance
Ray task 3 -> append oss://BUCKET/DATASET/main.lance

同一个Lance datasetcompact、append、create index都可能提交新版本。多个任务同时对同一个dataset提交版本,容易产生冲突或不可控的重试成本。

更多信息

除了通过JupyterLab Terminal提交作业外,您也可以通过本地环境提交Ray任务。具体操作方法请参见提交作业到Ray应用中关于Ray Python SDKRay CLI的说明。