使用 Python 操作 DLF Lance 表

更新时间:
复制 MD 格式

本文介绍如何在 Python 中使用 lance-dlf 连接 DLF Catalog,通过 PyLance 直接读写 Lance 表。适用于需要精细控制 Arrow Table 级别操作的场景。

说明

如需使用 Daft DataFrame 引擎进行查询过滤或批量计算,请参阅使用 Daft 操作 DLF Lance 表

功能说明

lance-dlf 提供以下核心功能:

  • 连接 DLF Catalog

  • 将 DLF Database 映射为 Lance Namespace

  • 仅暴露 type=lance-table 类型的表

  • 通过 DLF load_table_token 获取 OSS 临时访问凭证

  • 将 OSS 临时凭证转换为 PyLance 可用的 storage_options

数据的实际读写操作由 PyLance 完成:

# 写入数据
lance.write_dataset(table, location, storage_options=storage_options)

# 读取数据
lance.dataset(location, storage_options=storage_options)

快速开始

安装 lance-dlf

python3 -m pip install lance-dlf

配置 Catalog 连接

CONFIG = {
    "uri": "http://<DLF-ENDPOINT>", # 公网访问须使用HTTPS协议
    "warehouse": "<YOUR-CATALOG>",
    "token.provider": "dlf",
    "dlf.region": "<REGION-ID>",
    "dlf.access-key-id": "<ACCESS-KEY-ID>",
    "dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
    "dlf.oss-endpoint": "<OSS-ENDPOINT>", # VPC访问可选,公网访问必填
}

配置参数:

参数名称

说明

uri

DLF Paimon REST 访问端点,详见服务接入点。VPC访问支持HTTPHTTPS协议,公网访问必须使用HTTPS协议

warehouse

DLF Catalog 名称

token.provider

AccessKey 认证模式下,填写 dlf

dlf.region

DLF 所属地域ID(如 cn-hangzhou),详见服务接入点

dlf.access-key-id

访问 DLF 的 AccessKey ID

dlf.access-key-secret

访问 DLF 的 AccessKey Secret

dlf.security-token

(可选)STS 场景下使用的安全令牌

dlf.oss-endpoint

(可选)OSS 公网访问接入点(如 oss-cn-hangzhou.aliyuncs.com),仅公网访问DLF必填。DLF默认关闭公网访问,如需开通,见公网访问DLF Paimon REST公测说明

重要

AccessKey ID 和 AccessKey Secret 是访问阿里云资源的重要凭证,请妥善保管。请勿将真实 AccessKey 提交到 Git 仓库。建议从环境变量、密钥管理系统或运行时配置读取访问凭证。

连接 DLF Catalog

导入 lance_dlf 模块会自动注册 dlf namespace 实现:

import lance_namespace
import lance_dlf  # noqa: F401

# 连接 DLF Catalog
ns = lance_namespace.connect("dlf", CONFIG)

# 验证连接
print(ns.namespace_id())

namespace_id() 方法返回当前连接的 Catalog 信息,用于确认连接的 DLF 访问端点和 Catalog名称。

查看 Namespace 和 Table

from lance_namespace import (
    DescribeNamespaceRequest,
    DescribeTableRequest,
    ListNamespacesRequest,
    ListTablesRequest,
)

DATABASE = "<database>"
TABLE = "<table>"

# 列出所有 Namespace
namespaces = ns.list_namespaces(ListNamespacesRequest(id=[]))
print(namespaces)

# 查看指定 Namespace 元数据
namespace = ns.describe_namespace(DescribeNamespaceRequest(id=[DATABASE]))
print(namespace)

# 列出 Namespace 下的所有 Table
tables = ns.list_tables(ListTablesRequest(id=[DATABASE]))
print(tables)

# 查看 Table 详情
table = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
print(table.location)
print(table.properties)
print(table.storage_options)

describe_table 返回字段:

字段名称

说明

location

Lance 数据的实际存储路径,通常为 oss://bucket/path 格式

properties

DLF Table 的 Schema 选项,type 字段必须为 lance-table

storage_options

PyLance 读写 OSS 所需的临时访问凭证

基础操作

创建 Lance 表并写入数据

如果表不存在,需要先创建表然后写入数据。原理:

  1. 使用 ns.create_table() 在 DLF 中创建表

  2. DLF 返回 Lance 表的存储位置(location

  3. 获取 OSS 临时访问凭证(storage_options

  4. 使用PyLance 将 Arrow 数据写入指定的存储位置。 arrow_table_to_ipc_bytes 函数的定义见数据序列化工具函数

import lance
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest

DATABASE = "default"
TABLE = "test_lance_create_001"

table_id = [DATABASE, TABLE]

# 构造测试数据
data = pa.table({
    "f0": pa.array([101, 102, 103], type=pa.int64()),
    "f1": pa.array(["create-a", "create-b", "create-c"], type=pa.string()),
})

# 将 Arrow Table 转为 IPC bytes
def arrow_table_to_ipc_bytes(table):
    sink = pa.BufferOutputStream()
    writer = pa.ipc.new_stream(sink, table.schema)
    writer.write_table(table)
    writer.close()
    return sink.getvalue().to_pybytes()

# 创建表并写入数据
create_response = ns.create_table(
    CreateTableRequest(id=table_id),
    arrow_table_to_ipc_bytes(data),
)

print(create_response.location)
print(create_response.storage_options.keys())

# 读取验证
desc = ns.describe_table(DescribeTableRequest(id=table_id))
dataset = lance.dataset(desc.location, storage_options=desc.storage_options)
result = dataset.to_table()
print(result)

预期输出:

pyarrow.Table
f0: int64
f1: string
----
f0: [[101,102,103]]
f1: [["create-a","create-b","create-c"]]

写入已有表

如果 DLF 中已存在 Lance 表,可以先通过 describe_table 获取表的存储位置和访问凭证,然后使用 PyLance 写入数据。

import lance
import pyarrow as pa
from lance_namespace import DescribeTableRequest

DATABASE = "default"
TABLE = "test_lance_table"

table_id = [DATABASE, TABLE]

# 获取表详情
desc = ns.describe_table(DescribeTableRequest(id=table_id))

# 构造测试数据
data = pa.table({
    "f0": pa.array([1, 2, 3], type=pa.int64()),
    "f1": pa.array(["value-1", "value-2", "value-3"], type=pa.string()),
})

# 写入数据
lance.write_dataset(
    data,
    desc.location,
    mode="overwrite",
    storage_options=desc.storage_options,
)

# 读取验证
dataset = lance.dataset(desc.location, storage_options=desc.storage_options)
print(dataset.to_table())

写入模式说明:

模式

说明

overwrite

覆盖写入,适用于初始化空表或测试表

append

追加写入,要求 Schema 兼容

警告

overwrite 模式会覆盖已有的数据,请谨慎操作。

读取数据

如果 DLF 中已存在 type=lance-table 的表且已包含数据,可以通过 describe_table 获取存储位置和临时凭证,然后使用 PyLance 读取数据。详见创建 Lance 表并写入数据代码块末端的“读取验证”部分。

完整示例

以下示例演示了完整的工作流程:连接 DLF、创建新表、写入数据、列出表、读取验证。

from datetime import datetime

import lance
import lance_namespace
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest, ListTablesRequest

import lance_dlf  # noqa: F401


CONFIG = {
    "uri": "http://<DLF-ENDPOINT>",
    "warehouse": "<YOUR-CATALOG>",
    "token.provider": "dlf",
    "dlf.region": "<REGION-ID>",
    "dlf.access-key-id": "<ACCESS-KEY-ID>",
    "dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
    "dlf.oss-endpoint": "<OSS-ENDPOINT>", # 可选
}

DATABASE = "default"


def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    """将 PyArrow 表转换为 IPC 字节流"""
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()


def main():
    # 连接 DLF Catalog
    ns = lance_namespace.connect("dlf", CONFIG)

    # 生成唯一表名
    table_name = "test_lance_create_" + datetime.now().strftime("%Y%m%d_%H%M%S")
    table_id = [DATABASE, table_name]

    # 构造测试数据
    data = pa.table({
        "f0": pa.array([101, 102, 103], type=pa.int64()),
        "f1": pa.array(["create-a", "create-b", "create-c"], type=pa.string()),
    })

    # 创建表并写入数据
    create_response = ns.create_table(
        CreateTableRequest(id=table_id),
        arrow_table_to_ipc_bytes(data),
    )

    print("created:", ".".join(table_id))
    print("location:", create_response.location)

    # 列出表并验证
    tables = ns.list_tables(ListTablesRequest(id=[DATABASE]))
    print("listed_after_create:", table_name in tables.tables)

    # 读取数据并验证
    desc = ns.describe_table(DescribeTableRequest(id=table_id))
    dataset = lance.dataset(desc.location, storage_options=desc.storage_options)
    result = dataset.to_table()

    print(result)

    # 数据完整性校验
    expected = data.to_pylist()
    actual = result.to_pylist()
    if actual != expected:
        raise AssertionError(f"readback mismatch: expected={expected}, actual={actual}")

    print("create_write_read: ok")


if __name__ == "__main__":
    main()

附录

数据序列化工具函数

lance_namespacecreate_table 接口要求将 Arrow 表序列化为 IPC 字节流。

import pyarrow as pa


def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    """将 PyArrow 表转换为 IPC 字节流"""
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()

自动构造测试数据

如果不想硬编码列名和类型,可以从 DLF Table Schema 中提取字段信息,自动构造测试数据。

import pyarrow as pa
from lance_dlf.common.identifier import Identifier


def sample_value(field_type: str, row: int):
    """根据字段类型生成示例值"""
    normalized = field_type.lower()
    if "int" in normalized:
        return row + 1
    if "string" in normalized or "char" in normalized or "varchar" in normalized:
        return f"value-{row + 1}"
    raise ValueError(f"Unsupported sample field type: {field_type}")


def build_sample_table(ns, database: str, table: str) -> pa.Table:
    """根据 DLF 表结构构造示例数据"""
    raw_table = ns._api.get_table(Identifier(database, table))
    schema = raw_table.get_schema()
    fields = schema.fields if schema and schema.fields else []

    data = {}
    for field in fields:
        field_type = str(field.type)
        data[field.name] = [sample_value(field_type, row) for row in range(3)]

    return pa.table(data)

使用示例:

desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
data = build_sample_table(ns, DATABASE, TABLE)

lance.write_dataset(
    data,
    desc.location,
    mode="overwrite",
    storage_options=desc.storage_options,
)