本文介绍如何通过 Daft DataFrame 引擎读写由 DLF Catalog 管理的 Lance 表。Daft 提供 lazy DataFrame API,适合需要查询过滤或批量计算的场景。
如需使用 PyLance 直接读写 Lance 表,请参阅使用 Python 操作 DLF Lance 表。
术语说明
组件 | 作用 |
DLF | Catalog 服务,管理 database/table 元数据,存储 Lance 表路径,发放 OSS 临时凭证 |
Lance/PyLance | 数据格式和底层读写实现,负责 OSS 上 Lance dataset 的实际 I/O |
Daft | DataFrame 计算引擎,提供 |
| 连接器,从 DLF 获取表路径和 OSS 临时凭证,供 Daft 使用。 |
技术架构
用户代码
→ lance_namespace.connect("dlf", CONFIG) # 连接 DLF Catalog
→ DLF 返回 Lance 表路径 + OSS 临时凭证
→ apply_oss_environment(...) # 设置 OSS_* 环境变量
→ daft.read_lance("oss://...") # 读取数据
→ df.write_lance("oss://...", mode="append") # 写入数据概念映射:
DLF database → Lance namespace
DLF table → Lance table
前提条件
安装依赖
python3 -m pip install lance-dlf daftlance-dlf 会自动安装 lance_namespace 和 pyarrow 等依赖。
配置 Catalog 连接
CONFIG = {
"uri": "http://<DLF-ENDPOINT>", # 公网访问须使用HTTPS协议
"warehouse": "<YOUR-CATALOG>",
"token.provider": "dlf",
"dlf.region": "<REGION-ID>",
"dlf.access-key-id": "<ACCESS-KEY-ID>",
"dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
"dlf.oss-endpoint": "<OSS-ENDPOINT>", # VPC访问可选,公网访问必填
}配置参数:
参数名称 | 说明 |
| DLF Paimon REST 访问端点,详见服务接入点。VPC访问支持HTTP和HTTPS协议,公网访问必须使用HTTPS协议 |
| DLF Catalog 名称 |
| AccessKey 认证模式下,填写 |
| DLF 所属地域ID(如 |
| 访问 DLF 的 AccessKey ID |
| 访问 DLF 的 AccessKey Secret |
| (可选)STS 场景下使用的安全令牌 |
| (可选)OSS 公网访问接入点(如 |
AccessKey ID 和 AccessKey Secret 是访问阿里云资源的重要凭证,请妥善保管。请勿将真实 AccessKey 提交到 Git 仓库。建议从环境变量、密钥管理系统或运行时配置读取访问凭证。
连接 DLF
导入 lance_dlf 会自动注册 dlf namespace:
import lance_namespace
import lance_dlf # noqa: F401
ns = lance_namespace.connect("dlf", CONFIG)
print(ns.namespace_id())读取已有表
第一步:获取表路径和凭证
通过Daft 读写表之前,先通过 describe_table() 从 DLF 获取表路径和 OSS 临时凭证。
from lance_namespace import DescribeTableRequest
DATABASE = "<database>"
TABLE = "<table>"
desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
print(desc.location)
print(sorted((desc.storage_options or {}).keys()))desc 包含两个关键字段:
desc.location:Lance 表存储路径,格式为oss://bucket/path/to/tabledesc.storage_options:OSS 临时凭证字典
第二步:设置并调用 OSS 凭证环境变量
Daft 底层通过 PyLance 读写 OSS。需要在代码里定义 apply_oss_environment,把 DLF 返回的临时凭证设置成 OSS_* 环境变量,然后调用该函数:
import os
def apply_oss_environment(storage_options: dict) -> None:
os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
if storage_options.get("oss_security_token"):
os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
if storage_options.get("oss_region"):
os.environ["OSS_REGION"] = storage_options["oss_region"]
# 调用函数
apply_oss_environment(desc.storage_options or {})第三步:使用Daft读取表数据
import daft
df = daft.read_lance(desc.location)
df.show()写入数据
追加写入已有表
完成第一步:获取表路径和凭证和第二步:设置并调用 OSS 凭证环境变量后,使用 mode="append" 追加数据:
# 前置:连接Catalog、获取表路径和凭证、设置并调用OSS凭证环境变量
desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
apply_oss_environment(desc.storage_options or {})
# 追加写入
append_df = daft.from_pydict({
"f0": [204],
"f1": ["daft-d"],
})
append_df.write_lance(desc.location, mode="append")
# 写入后读取验证:
df2 = daft.read_lance(desc.location)
df2.show()创建新表并写入
新表必须先通过 ns.create_table() 创建(该方法同时写入首批数据),再用 Daft 进行后续读写。
# 前置:连接Catalog、获取表路径和凭证、设置并调用OSS凭证环境变量
from datetime import datetime
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest
# 定义Arrow转换成IPC字节流
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return sink.getvalue().to_pybytes()
# 创建表并写入首批数据
table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
table_id = [DATABASE, table_name]
rows = {
"f0": [201, 202, 203],
"f1": ["daft-a", "daft-b", "daft-c"],
}
arrow_table = pa.table(rows)
create_response = ns.create_table(
CreateTableRequest(id=table_id),
arrow_table_to_ipc_bytes(arrow_table),
)
print(create_response.location)创建完成后,通过 describe_table 获取凭证,再用 Daft 读写:
# 获取凭证并设置环境变量
desc = ns.describe_table(DescribeTableRequest(id=table_id))
apply_oss_environment(desc.storage_options or {})
# 读取验证
df = daft.read_lance(desc.location)
df.show()
# 使用Daft追加数据
append_rows = {
"f0": [204],
"f1": ["daft-d"],
}
append_df = daft.from_pydict(append_rows)
meta = append_df.write_lance(desc.location, mode="append")
meta.show()
# 再次读取确认
appended_df = daft.read_lance(desc.location)
appended_df.show()预期输出:
[
{"f0": 201, "f1": "daft-a"},
{"f0": 202, "f1": "daft-b"},
{"f0": 203, "f1": "daft-c"},
{"f0": 204, "f1": "daft-d"},
]完整示例
以下脚本演示端到端流程:创建新表 → 读取 → 追加 → 验证。
from __future__ import annotations
from datetime import datetime
import os
import daft
import lance_namespace
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest
import lance_dlf # noqa: F401
CONFIG = {
"uri": "http://<DLF-ENDPOINT>",
"warehouse": "<YOUR-CATALOG>",
"token.provider": "dlf",
"dlf.region": "<REGION-ID>",
"dlf.access-key-id": "<ACCESS-KEY-ID>",
"dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
"dlf.oss-endpoint": "<OSS-ENDPOINT>", # 仅公网访问DLF必填
}
DATABASE = "default"
# 定义 Arrow 表序列化函数
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return sink.getvalue().to_pybytes()
# 定义OSS环境变量函数
def apply_oss_environment(storage_options: dict) -> None:
os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
if storage_options.get("oss_security_token"):
os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
if storage_options.get("oss_region"):
os.environ["OSS_REGION"] = storage_options["oss_region"]
def df_to_pydict(df):
try:
return df.to_pydict()
except AttributeError:
return df.collect().to_pydict()
def main() -> None:
ns = lance_namespace.connect("dlf", CONFIG)
# 1. 创建新表
table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
table_id = [DATABASE, table_name]
rows = {
"f0": [201, 202, 203],
"f1": ["daft-a", "daft-b", "daft-c"],
}
arrow_table = pa.table(rows)
create_response = ns.create_table(
CreateTableRequest(id=table_id),
arrow_table_to_ipc_bytes(arrow_table),
)
print("created:", ".".join(table_id))
print("location:", create_response.location)
# 2. 获取凭证
desc = ns.describe_table(DescribeTableRequest(id=table_id))
apply_oss_environment(desc.storage_options or {})
# 3. 读取验证
read_df = daft.read_lance(desc.location)
read_df.show()
if df_to_pydict(read_df) != rows:
raise AssertionError("Initial readback mismatch")
# 4. 追加数据
append_rows = {
"f0": [204],
"f1": ["daft-d"],
}
append_df = daft.from_pydict(append_rows)
append_df.write_lance(desc.location, mode="append").show()
# 5. 最终验证
appended_df = daft.read_lance(desc.location)
appended_df.show()
expected = {
"f0": rows["f0"] + append_rows["f0"],
"f1": rows["f1"] + append_rows["f1"],
}
if df_to_pydict(appended_df) != expected:
raise AssertionError("Daft append readback mismatch")
print("daft + dlf + lance: ok")
if __name__ == "__main__":
main()注意事项
新表初始化:使用
ns.create_table(...)创建新表并写入第一批数据,再用 Daft 读写后续数据。日志脱敏:
storage_options的完整值包含临时 AK/SK/token,建议仅打印 key 列表:print(sorted((desc.storage_options or {}).keys()))