使用 Daft 操作 DLF Lance 表

更新时间:
复制 MD 格式

本文介绍如何通过 Daft DataFrame 引擎读写由 DLF Catalog 管理的 Lance 表。Daft 提供 lazy DataFrame API,适合需要查询过滤或批量计算的场景。

说明

如需使用 PyLance 直接读写 Lance 表,请参阅使用 Python 操作 DLF Lance 表

术语说明

组件

作用

DLF

Catalog 服务,管理 database/table 元数据,存储 Lance 表路径,发放 OSS 临时凭证

Lance/PyLance

数据格式和底层读写实现,负责 OSS 上 Lance dataset 的实际 I/O

Daft

DataFrame 计算引擎,提供 read_lance / write_lance 接口

lance-dlf

连接器,从 DLF 获取表路径和 OSS 临时凭证,供 Daft 使用。lance-dlf 仅暴露 type=lance-table 的表

技术架构

用户代码
  → lance_namespace.connect("dlf", CONFIG)     # 连接 DLF Catalog
  → DLF 返回 Lance 表路径 + OSS 临时凭证
  → apply_oss_environment(...)                  # 设置 OSS_* 环境变量
  → daft.read_lance("oss://...")               # 读取数据
  → df.write_lance("oss://...", mode="append") # 写入数据

概念映射:

  • DLF database → Lance namespace

  • DLF table → Lance table

前提条件

安装依赖

python3 -m pip install lance-dlf daft

lance-dlf 会自动安装 lance_namespacepyarrow 等依赖。

配置 Catalog 连接

CONFIG = {
    "uri": "http://<DLF-ENDPOINT>", # 公网访问须使用HTTPS协议
    "warehouse": "<YOUR-CATALOG>",
    "token.provider": "dlf",
    "dlf.region": "<REGION-ID>",
    "dlf.access-key-id": "<ACCESS-KEY-ID>",
    "dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
    "dlf.oss-endpoint": "<OSS-ENDPOINT>", # VPC访问可选,公网访问必填
}

配置参数:

参数名称

说明

uri

DLF Paimon REST 访问端点,详见服务接入点。VPC访问支持HTTPHTTPS协议,公网访问必须使用HTTPS协议

warehouse

DLF Catalog 名称

token.provider

AccessKey 认证模式下,填写 dlf

dlf.region

DLF 所属地域ID(如 cn-hangzhou),详见服务接入点

dlf.access-key-id

访问 DLF 的 AccessKey ID

dlf.access-key-secret

访问 DLF 的 AccessKey Secret

dlf.security-token

(可选)STS 场景下使用的安全令牌

dlf.oss-endpoint

(可选)OSS 公网访问接入点(如 oss-cn-hangzhou.aliyuncs.com),仅公网访问DLF必填。DLF默认关闭公网访问,如需开通,见公网访问DLF Paimon REST公测说明

重要

AccessKey ID 和 AccessKey Secret 是访问阿里云资源的重要凭证,请妥善保管。请勿将真实 AccessKey 提交到 Git 仓库。建议从环境变量、密钥管理系统或运行时配置读取访问凭证。

连接 DLF

导入 lance_dlf 会自动注册 dlf namespace:

import lance_namespace
import lance_dlf  # noqa: F401

ns = lance_namespace.connect("dlf", CONFIG)
print(ns.namespace_id())

读取已有表

第一步:获取表路径和凭证

通过Daft 读写表之前,先通过 describe_table() 从 DLF 获取表路径和 OSS 临时凭证。

from lance_namespace import DescribeTableRequest

DATABASE = "<database>"
TABLE = "<table>"

desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))

print(desc.location)
print(sorted((desc.storage_options or {}).keys()))

desc 包含两个关键字段:

  • desc.location:Lance 表存储路径,格式为 oss://bucket/path/to/table

  • desc.storage_options:OSS 临时凭证字典

第二步:设置并调用 OSS 凭证环境变量

Daft 底层通过 PyLance 读写 OSS。需要在代码里定义 apply_oss_environment,把 DLF 返回的临时凭证设置成 OSS_* 环境变量,然后调用该函数:

import os


def apply_oss_environment(storage_options: dict) -> None:
    os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
    os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
    os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
    if storage_options.get("oss_security_token"):
        os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
    if storage_options.get("oss_region"):
        os.environ["OSS_REGION"] = storage_options["oss_region"]

# 调用函数
apply_oss_environment(desc.storage_options or {})

第三步:使用Daft读取表数据

import daft

df = daft.read_lance(desc.location)
df.show()

写入数据

追加写入已有表

完成第一步:获取表路径和凭证第二步:设置并调用 OSS 凭证环境变量后,使用 mode="append" 追加数据:

# 前置:连接Catalog、获取表路径和凭证、设置并调用OSS凭证环境变量
desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
apply_oss_environment(desc.storage_options or {})

# 追加写入
append_df = daft.from_pydict({
    "f0": [204],
    "f1": ["daft-d"],
})

append_df.write_lance(desc.location, mode="append")

# 写入后读取验证:
df2 = daft.read_lance(desc.location)
df2.show()

创建新表并写入

新表必须先通过 ns.create_table() 创建(该方法同时写入首批数据),再用 Daft 进行后续读写。

# 前置:连接Catalog、获取表路径和凭证、设置并调用OSS凭证环境变量
from datetime import datetime
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest

# 定义Arrow转换成IPC字节流
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()


# 创建表并写入首批数据
table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
table_id = [DATABASE, table_name]

rows = {
    "f0": [201, 202, 203],
    "f1": ["daft-a", "daft-b", "daft-c"],
}
arrow_table = pa.table(rows)

create_response = ns.create_table(
    CreateTableRequest(id=table_id),
    arrow_table_to_ipc_bytes(arrow_table),
)
print(create_response.location)

创建完成后,通过 describe_table 获取凭证,再用 Daft 读写:

# 获取凭证并设置环境变量
desc = ns.describe_table(DescribeTableRequest(id=table_id))
apply_oss_environment(desc.storage_options or {})

# 读取验证
df = daft.read_lance(desc.location)
df.show()

# 使用Daft追加数据
append_rows = {
    "f0": [204],
    "f1": ["daft-d"],
}
append_df = daft.from_pydict(append_rows)

meta = append_df.write_lance(desc.location, mode="append")
meta.show()

# 再次读取确认
appended_df = daft.read_lance(desc.location)
appended_df.show()

预期输出:

[
    {"f0": 201, "f1": "daft-a"},
    {"f0": 202, "f1": "daft-b"},
    {"f0": 203, "f1": "daft-c"},
    {"f0": 204, "f1": "daft-d"},
]

完整示例

以下脚本演示端到端流程:创建新表 → 读取 → 追加 → 验证。

from __future__ import annotations

from datetime import datetime
import os

import daft
import lance_namespace
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest

import lance_dlf  # noqa: F401


CONFIG = {
    "uri": "http://<DLF-ENDPOINT>",
    "warehouse": "<YOUR-CATALOG>",
    "token.provider": "dlf",
    "dlf.region": "<REGION-ID>",
    "dlf.access-key-id": "<ACCESS-KEY-ID>",
    "dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
    "dlf.oss-endpoint": "<OSS-ENDPOINT>", # 仅公网访问DLF必填
}

DATABASE = "default"

# 定义 Arrow 表序列化函数
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()

# 定义OSS环境变量函数
def apply_oss_environment(storage_options: dict) -> None:
    os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
    os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
    os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
    if storage_options.get("oss_security_token"):
        os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
    if storage_options.get("oss_region"):
        os.environ["OSS_REGION"] = storage_options["oss_region"]


def df_to_pydict(df):
    try:
        return df.to_pydict()
    except AttributeError:
        return df.collect().to_pydict()


def main() -> None:
    ns = lance_namespace.connect("dlf", CONFIG)

    # 1. 创建新表
    table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
    table_id = [DATABASE, table_name]

    rows = {
        "f0": [201, 202, 203],
        "f1": ["daft-a", "daft-b", "daft-c"],
    }
    arrow_table = pa.table(rows)

    create_response = ns.create_table(
        CreateTableRequest(id=table_id),
        arrow_table_to_ipc_bytes(arrow_table),
    )
    print("created:", ".".join(table_id))
    print("location:", create_response.location)

    # 2. 获取凭证
    desc = ns.describe_table(DescribeTableRequest(id=table_id))
    apply_oss_environment(desc.storage_options or {})

    # 3. 读取验证
    read_df = daft.read_lance(desc.location)
    read_df.show()
    if df_to_pydict(read_df) != rows:
        raise AssertionError("Initial readback mismatch")

    # 4. 追加数据
    append_rows = {
        "f0": [204],
        "f1": ["daft-d"],
    }
    append_df = daft.from_pydict(append_rows)
    append_df.write_lance(desc.location, mode="append").show()


    # 5. 最终验证
    appended_df = daft.read_lance(desc.location)
    appended_df.show()
    expected = {
        "f0": rows["f0"] + append_rows["f0"],
        "f1": rows["f1"] + append_rows["f1"],
    }
    if df_to_pydict(appended_df) != expected:
        raise AssertionError("Daft append readback mismatch")

    print("daft + dlf + lance: ok")


if __name__ == "__main__":
    main()

注意事项

  • 新表初始化:使用 ns.create_table(...) 创建新表并写入第一批数据,再用 Daft 读写后续数据。

  • 日志脱敏storage_options 的完整值包含临时 AK/SK/token,建议仅打印 key 列表:

    print(sorted((desc.storage_options or {}).keys()))