Daft支持多种开放文件格式和开放表格式的读取、写入与恢复。本文介绍各格式的支持情况,提供Lance、Iceberg和Parquet的可执行代码示例,并说明崩溃恢复与开发建议。
核心结论
-
Parquet:最适合做最通用的列式交换格式,也是最稳妥的阶段产物格式。
-
Lance:更适合做可版本化的数据资产,尤其是向量、多模态特征和实验数据集,支持append-and-snapshot语义。
-
Iceberg:更适合放到有catalog管理的表层,强调快照隔离、Schema治理和表级一致性。
-
Daft当前没有Spark风格的“显式checkpoint API + lineage截断”模型。更常见的工程实践是把关键阶段物化成Parquet或Lance文件,再从阶段边界恢复。
-
Lance的Branch生命周期主要由Lance SDK管理。Daft更适合消费某个稳定版本或快照的数据集。
运行前提
下面的代码示例已在Python 3.11下验证通过。为了让Lance与Iceberg示例在本地单机直接可跑,请安装以下依赖:
python3.11 -m pip install "daft[lance,iceberg]"
python3.11 -m pip install "pyiceberg[sql-sqlite]"
python3.11 -m pip install pylance
如果后续还要验证Delta Lake或Hudi连接器,可以按需额外安装:
python3.11 -m pip install "daft[deltalake,hudi]"
其中pylance是Lance Python SDK。本文后面的Lance Branch与向量检索示例都直接依赖这个SDK,而不只是Daft的read_lance/write_lance封装。
首次写入前先准备目录
在生产环境里,Parquet、Lance、Iceberg的目标路径通常是共享存储挂载、数据湖目录或warehouse路径。第一次写入前,请先把目录准备好,不要把目录初始化隐含在业务作业里。
例如,如果数据湖目录挂载在/data/lakehouse/open-formats,可以先执行以下命令:
mkdir -p /data/lakehouse/open-formats/parquet_case
mkdir -p /data/lakehouse/open-formats/lance_case
mkdir -p /data/lakehouse/open-formats/iceberg_case/warehouse
本文中的/tmp/daft_open_formats_doc/...路径只是为了本机验证,代码里会创建本地临时目录。切到真实数据湖路径时,建议先用上面的方式把目录建好,再把路径交给作业。若后端是对象存储,也建议先准备好Bucket/Prefix再开始写入。
开放文件与开放表格式支持列表
下表整理了Daft当前常用的开放文件格式与开放表格式支持情况。本文后续仅重点展开Lance、Iceberg、Parquet三种格式。
|
格式 |
读取接口 |
写入接口 |
当前支持 |
典型用途 |
说明 |
|
CSV |
|
|
读写 |
调试导出、轻量交换 |
行式文本格式,兼容性高但不适合作为大规模分析主存储 |
|
JSON |
|
|
读写 |
事件日志、半结构化交换 |
当前以line-delimited JSON为主 |
|
Parquet |
|
|
读写 |
批量交换、阶段产物、列式分析 |
最通用的列式数据格式 |
|
Lance |
|
|
读写 |
向量、多模态、版本化数据资产 |
支持 |
|
Iceberg |
|
|
读写 |
Catalog管理的大表、快照治理 |
需要PyIceberg table对象或metadata路径 |
|
Delta Lake |
|
|
读写 |
湖仓表格式接入 |
需要对应Delta Lake connector依赖 |
|
Hudi |
|
- |
只读 |
接入已有Hudi数据湖 |
当前以读取现有Hudi表为主 |
Parquet:最通用的阶段产物与交换格式
下面的示例模拟订单数据先写成Parquet,再重新读回做过滤和聚合。它适合用来验证最基础的列式读写流水线。
from pathlib import Path
import shutil
import daft
from daft import col
root = Path("/tmp/daft_open_formats_doc/parquet_case")
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)
parquet_path = str(root / "orders_parquet")
orders = daft.from_pydict(
{
"order_id": [1001, 1002, 1003, 1004],
"city": ["Hangzhou", "Beijing", "Hangzhou", "Shanghai"],
"channel": ["app", "web", "app", "app"],
"status": ["paid", "paid", "refunded", "paid"],
"amount": [128.0, 256.0, 64.0, 512.0],
}
)
orders.write_parquet(parquet_path, write_mode="overwrite")
result = (
daft.read_parquet(parquet_path)
.where(col("status") == "paid")
.groupby("city")
.agg(col("amount").sum().alias("paid_gmv"))
.sort("paid_gmv", desc=True)
)
print(result.to_pydict())
-
这段代码的实际用途如下:
-
orders.write_parquet(...):把上游阶段结果稳定落盘,生成一个所有后续任务都能复用的列式产物。 -
daft.read_parquet(...):重新从阶段产物恢复DataFrame,而不是依赖进程内存。 -
where + groupby + agg:验证写出后再读回的业务逻辑和结果仍然保持一致。
-
-
Parquet最适合放在以下位置:
-
原始数据清洗完成后的中间层。
-
下游多个任务共用的交换层。
-
故障恢复时的阶段边界。
-
如果把parquet_path换成生产数据湖路径(如/data/lakehouse/open-formats/parquet_case/orders_parquet),建议先执行mkdir -p创建父目录,再运行写入操作。
Lance:版本化数据资产与多模态特征表
下面的示例演示两个最常用的Lance操作:先写入基础版本,再追加一批新数据,并分别读取最新版本与历史版本。
from pathlib import Path
import shutil
import daft
root = Path("/tmp/daft_open_formats_doc/lance_case")
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)
uri = str(root / "orders.lance")
df_v0 = daft.from_pydict(
{
"id": [1, 2],
"city": ["Hangzhou", "Beijing"],
"score": [0.1, 0.2],
}
)
df_v1 = daft.from_pydict(
{
"id": [3],
"city": ["Shanghai"],
"score": [0.9],
}
)
df_v0.write_lance(uri, mode="overwrite")
df_v1.write_lance(uri, mode="append")
latest = daft.read_lance(uri).sort("id")
snapshot = daft.read_lance(uri, version=1).sort("id")
print("latest =", latest.to_pydict())
print("snapshot =", snapshot.to_pydict())
-
这段代码的实际用途如下:
-
write_lance(..., mode="overwrite"):创建基础数据集。 -
write_lance(..., mode="append"):在不重写旧数据的前提下新增版本。 -
read_lance(uri):读取最新版本,适合日常分析。 -
read_lance(uri, version=1):回到指定历史版本,适合复现某次实验或对账。
-
-
Lance更适合以下开发场景:
-
Embedding、标签、打分结果等需要版本管理的数据资产。
-
图像、文本块、向量特征等多模态分析结果。
-
需要追加更新,但又希望保留历史快照的实验表。
-
Lance:向量能力
Lance支持把结构化字段和向量列放在同一个版本化数据集中。更常见的工程分工是:
-
Daft负责上游文本、图像或多模态流水线,以及Embedding的生成。
-
Lance负责把向量列和业务元数据一起落盘,并通过Lance SDK执行原生向量检索。
下面的示例演示一个最小可运行的向量检索流程:
-
构造一张包含业务元数据和Embedding的Lance数据集。
-
使用Lance原生
nearest查询做Top-K检索。 -
校验Top-1命中预期文档,并确认返回结果里仍然保留业务元数据列。
from pathlib import Path
import shutil
import lance
import pyarrow as pa
DIM = 4
TOP_K = 2
root = Path("/tmp/daft_open_formats_doc/lance_vector_search_case")
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)
uri = str(root / "embeddings.lance")
doc_ids = pa.array(["doc-001", "doc-002", "doc-003"])
categories = pa.array(["faq", "faq", "policy"])
embedding_values = pa.array(
[
0.99,
0.01,
0.00,
0.00,
0.96,
0.04,
0.00,
0.00,
0.05,
0.95,
0.00,
0.00,
],
type=pa.float32(),
)
embeddings = pa.FixedSizeListArray.from_arrays(embedding_values, DIM)
table = pa.Table.from_arrays(
[doc_ids, categories, embeddings],
names=["doc_id", "category", "embedding"],
)
dataset = lance.write_dataset(table, uri, mode="overwrite")
query = pa.array([1.0, 0.0, 0.0, 0.0], type=pa.float32())
result = dataset.to_table(
nearest={
"column": "embedding",
"q": query,
"k": TOP_K,
"metric": "cosine",
"use_index": False,
}
)
rows = result.to_pylist()
assert (root / "embeddings.lance").exists()
assert dataset.count_rows() == 3
assert len(rows) == TOP_K
assert rows[0]["doc_id"] == "doc-001"
assert "category" in rows[0]
assert "_distance" in rows[0]
print("vector_search_rows =", rows)
-
这段代码的实际用途如下:
-
FixedSizeListArray:把Embedding列显式写成定长向量列,便于Lance识别为向量数据。 -
lance.write_dataset(...):把向量和业务元数据一起写成可版本化的数据集。 -
to_table(nearest=...):直接使用Lance原生向量检索能力,而不是把数据读回后再手动算相似度。 -
use_index=False:固定走精确KNN,方便在小样本验证时稳定复现结果。大表可按需补向量索引。
-
-
这类写法特别适合以下场景:
-
文档、图片或音频片段的Embedding资产管理。
-
RAG 入库前的离线验证和回归测试。
-
需要把检索结果和原始业务字段一起返回给下游系统。
-
Lance:Branch能力
Lance的Branch更像“数据资产的实验分叉”。当前更合理的工程分工是:
-
用Daft负责主数据集的读写和分析。
-
用Lance SDK负责Branch的创建、切换和隔离写入。
下面的示例演示一个完整可运行的Branch流程:
-
用Daft写出主表。
-
用Lance SDK从主表创建实验分支。
-
只在分支里追加一条新数据,不影响主分支。
-
把分支结果转成Arrow,再回到Daft继续分析。
from pathlib import Path
import shutil
import daft
import lance
import pyarrow as pa
root = Path("/tmp/daft_open_formats_doc/lance_branch_case")
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)
uri = str(root / "features.lance")
daft.from_pydict({"id": [1, 2], "score": [0.1, 0.2]}).write_lance(uri, mode="overwrite")
branch_ds = lance.dataset(uri).create_branch("exp_branch")
lance.write_dataset(pa.table({"id": [3], "score": [0.9]}), uri=branch_ds, mode="append")
main_rows = lance.dataset(uri).to_table().to_pylist()
branch_rows = lance.dataset(uri).checkout_branch("exp_branch").to_table().to_pylist()
branch_df = daft.from_arrow(
lance.dataset(uri).checkout_branch("exp_branch").to_table()
).sort("id")
print("main_rows =", main_rows)
print("branch_rows =", branch_rows)
print("branch_df =", branch_df.to_pydict())
-
这段代码的实际用途如下:
-
create_Branch("exp_Branch"):从主版本拉出一个实验分支。 -
lance.write_dataset(..., uri=Branch_ds, mode="append"):只把新数据写到分支,不污染主线。 -
checkout_Branch("exp_Branch"):读取分支最新状态。 -
daft.from_arrow(...):把Branch数据重新接回Daft流水线继续处理。
-
-
使用Lance Branch时需要注意以下事项:
-
read_lance和write_lance负责主数据集、版本和快照的常规读写。 -
Branch生命周期管理更偏Lance SDK能力。
-
如果团队对Branch依赖很重,最好把Lance版本固定住,因为Branch API与底层版本绑定更强。
-
Iceberg:带Catalog的表治理与快照读取
Iceberg侧更接近“表治理”而不是“单纯文件读写”。下面的示例使用本地SQLite catalog和本地warehouse创建一张Iceberg表,然后做两次写入,并分别读取最新快照和第一次写入后的快照。
from pathlib import Path
import shutil
import daft
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import DoubleType, LongType, NestedField, StringType
root = Path("/tmp/daft_open_formats_doc/iceberg_case")
if root.exists():
shutil.rmtree(root)
warehouse = root / "warehouse"
warehouse.mkdir(parents=True, exist_ok=True)
catalog = SqlCatalog(
"local",
uri=f"sqlite:///{root / 'catalog.db'}",
warehouse=str(warehouse),
)
catalog.create_namespace("demo")
schema = Schema(
NestedField(1, "order_id", LongType(), required=True),
NestedField(2, "city", StringType(), required=False),
NestedField(3, "amount", DoubleType(), required=False),
)
table = catalog.create_table(("demo", "orders"), schema=schema)
daft.from_pydict(
{
"order_id": [1, 2],
"city": ["Hangzhou", "Beijing"],
"amount": [10.0, 20.0],
}
).write_iceberg(table, mode="append")
first_snapshot_id = table.refresh().metadata.current_snapshot_id
daft.from_pydict(
{
"order_id": [3],
"city": ["Shanghai"],
"amount": [30.0],
}
).write_iceberg(table, mode="append")
latest = daft.read_iceberg(table).sort("order_id")
first = daft.read_iceberg(table, snapshot_id=first_snapshot_id).sort("order_id")
print("first_snapshot_id =", first_snapshot_id)
print("latest =", latest.to_pydict())
print("first =", first.to_pydict())
-
这段代码的实际用途如下:
-
SqlCatalog(...):在本地构造一个最小可运行的Iceberg catalog,便于单机验证表级写入逻辑。 -
catalog.create_table(...):显式创建受Schema管理的Iceberg表。 -
write_iceberg(table, mode="append"):向Iceberg表追加数据。 -
read_iceberg(table, snapshot_id=...):回放某个快照,复现某个时间点的表状态。
-
-
Iceberg更适合以下场景:
-
数据表必须纳入catalog管理。
-
需要稳定的快照语义和表级治理。
-
团队希望把读写边界放在“表”而不是“目录中的文件集合”。
-
Checkpoint与崩溃恢复
Daft当前更常见的恢复模式不是“整条lineage自动截断”,而是自己把关键阶段物化成表或文件,再把这些物化结果当作恢复边界。
下面的示例演示一个最实用的恢复流程:
-
原始订单先清洗并写成Parquet。
-
下游特征阶段再从该Parquet继续处理,并写成Lance。
-
如果第二阶段中途崩溃,重跑时直接从Stage 1的Parquet恢复,而不是重新处理原始输入。
from pathlib import Path
import shutil
import daft
from daft import col
root = Path("/tmp/daft_open_formats_doc/checkpoint_case")
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)
stage1 = str(root / "stage1_orders_parquet")
stage2 = str(root / "stage2_features.lance")
raw = daft.from_pydict(
{
"order_id": [1001, 1002, 1003, 1004],
"city": ["Hangzhou", "Beijing", "Hangzhou", "Shanghai"],
"status": ["paid", "paid", "refunded", "paid"],
"amount": [128.0, 256.0, 64.0, 512.0],
}
)
cleaned = (
raw.where(col("status") == "paid")
.with_column("amount_band", (col("amount") >= 256).cast(daft.DataType.int64()))
)
cleaned.write_parquet(stage1, write_mode="overwrite")
recovered = daft.read_parquet(stage1)
features = recovered.with_column(
"city_amount_key",
col("city") + daft.lit("_") + col("amount").cast(daft.DataType.string()),
)
features.write_lance(stage2, mode="overwrite")
print("stage1 =", daft.read_parquet(stage1).sort("order_id").to_pydict())
print("stage2 =", daft.read_lance(stage2).sort("order_id").to_pydict())
-
这段代码的实际用途如下:
-
stage1:是第一道恢复边界。只要这一步已经成功写出,后续失败就不需要回到原始数据。 -
stage2:表示更靠后的特征结果层,适合继续做实验、向量检索或下游分析。 -
read_parquet(stage1):体现“从阶段物化结果恢复”,而不是依赖失败前的进程状态。
-
-
切checkpoint时建议优先考虑以下边界:
-
文本清洗和切片完成后。
-
图像/视频解码完成后。
-
Embedding、特征或打分结果产出后。
-
下游会被多个任务复用的阶段完成后。
说明不要把checkpoint切得太细,也不要完全不切。切得太细会导致写放大和额外I/O,完全不切会让一次失败把整条流水线都拖回重跑。
-
开发建议
-
默认先把Parquet当成通用阶段格式。它最容易调试、最容易交换、也最适合做恢复边界。
-
当需要版本化特征表、向量表或多模态资产时,优先考虑Lance,而不是把所有中间产物都堆成Parquet。
-
当已经有catalog、命名空间、快照治理和表级一致性要求时,再把目标表建成Iceberg。不要把Iceberg当成“只是另一个目录格式”。
-
Checkpoint应该切在“下游值得复用、上游代价较大”的位置,而不是每一步都切,也不是完全不切。
-
Lance Branch适合做实验分叉、回归比较和特征试验,但不建议把Branch当成日常批处理主流程的唯一恢复手段。
-
Lance向量能力部分重点验证的是“向量资产可写入、可读回、可检索”。更深的ANN索引调优或服务化检索接口,建议在Lance SDK文档中进一步了解。
-
本地开发阶段尽量固定Python minor版本和Daft / Lance / PyIceberg版本。开放格式相关问题,很多时候是运行时兼容性问题,而不是DataFrame逻辑本身有问题。