开放数据格式:Lance/Iceberg/Parquet读写、恢复与开发建议

更新时间:
复制为 MD 格式

Daft支持多种开放文件格式和开放表格式的读取、写入与恢复。本文介绍各格式的支持情况,提供Lance、IcebergParquet的可执行代码示例,并说明崩溃恢复与开发建议。

核心结论

  • Parquet:最适合做最通用的列式交换格式,也是最稳妥的阶段产物格式。

  • Lance:更适合做可版本化的数据资产,尤其是向量、多模态特征和实验数据集,支持append-and-snapshot语义。

  • Iceberg:更适合放到有catalog管理的表层,强调快照隔离、Schema治理和表级一致性。

  • Daft当前没有Spark风格的“显式checkpoint API + lineage截断”模型。更常见的工程实践是把关键阶段物化成ParquetLance文件,再从阶段边界恢复。

  • LanceBranch生命周期主要由Lance SDK管理。Daft更适合消费某个稳定版本或快照的数据集。

运行前提

下面的代码示例已在Python 3.11下验证通过。为了让LanceIceberg示例在本地单机直接可跑,请安装以下依赖:

python3.11 -m pip install "daft[lance,iceberg]"
python3.11 -m pip install "pyiceberg[sql-sqlite]"
python3.11 -m pip install pylance

如果后续还要验证Delta LakeHudi连接器,可以按需额外安装:

python3.11 -m pip install "daft[deltalake,hudi]"

其中pylanceLance Python SDK。本文后面的Lance Branch与向量检索示例都直接依赖这个SDK,而不只是Daftread_lance/write_lance封装。

首次写入前先准备目录

在生产环境里,Parquet、Lance、Iceberg的目标路径通常是共享存储挂载、数据湖目录或warehouse路径。第一次写入前,请先把目录准备好,不要把目录初始化隐含在业务作业里。

例如,如果数据湖目录挂载在/data/lakehouse/open-formats,可以先执行以下命令:

mkdir -p /data/lakehouse/open-formats/parquet_case
mkdir -p /data/lakehouse/open-formats/lance_case
mkdir -p /data/lakehouse/open-formats/iceberg_case/warehouse
说明

本文中的/tmp/daft_open_formats_doc/...路径只是为了本机验证,代码里会创建本地临时目录。切到真实数据湖路径时,建议先用上面的方式把目录建好,再把路径交给作业。若后端是对象存储,也建议先准备好Bucket/Prefix再开始写入。

开放文件与开放表格式支持列表

下表整理了Daft当前常用的开放文件格式与开放表格式支持情况。本文后续仅重点展开Lance、Iceberg、Parquet三种格式。

格式

读取接口

写入接口

当前支持

典型用途

说明

CSV

read_csv

write_csv

读写

调试导出、轻量交换

行式文本格式,兼容性高但不适合作为大规模分析主存储

JSON

read_json

write_json

读写

事件日志、半结构化交换

当前以line-delimited JSON为主

Parquet

read_parquet

write_parquet

读写

批量交换、阶段产物、列式分析

最通用的列式数据格式

Lance

read_lance

write_lance

读写

向量、多模态、版本化数据资产

支持versionasofmerge

Iceberg

read_iceberg

write_iceberg

读写

Catalog管理的大表、快照治理

需要PyIceberg table对象或metadata路径

Delta Lake

read_deltalake

write_deltalake

读写

湖仓表格式接入

需要对应Delta Lake connector依赖

Hudi

read_hudi

-

只读

接入已有Hudi数据湖

当前以读取现有Hudi表为主

Parquet:最通用的阶段产物与交换格式

下面的示例模拟订单数据先写成Parquet,再重新读回做过滤和聚合。它适合用来验证最基础的列式读写流水线。

from pathlib import Path
import shutil

import daft
from daft import col

root = Path("/tmp/daft_open_formats_doc/parquet_case")
if root.exists():
    shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)

parquet_path = str(root / "orders_parquet")

orders = daft.from_pydict(
    {
        "order_id": [1001, 1002, 1003, 1004],
        "city": ["Hangzhou", "Beijing", "Hangzhou", "Shanghai"],
        "channel": ["app", "web", "app", "app"],
        "status": ["paid", "paid", "refunded", "paid"],
        "amount": [128.0, 256.0, 64.0, 512.0],
    }
)

orders.write_parquet(parquet_path, write_mode="overwrite")

result = (
    daft.read_parquet(parquet_path)
    .where(col("status") == "paid")
    .groupby("city")
    .agg(col("amount").sum().alias("paid_gmv"))
    .sort("paid_gmv", desc=True)
)

print(result.to_pydict())
  • 这段代码的实际用途如下:

    • orders.write_parquet(...):把上游阶段结果稳定落盘,生成一个所有后续任务都能复用的列式产物。

    • daft.read_parquet(...):重新从阶段产物恢复DataFrame,而不是依赖进程内存。

    • where + groupby + agg:验证写出后再读回的业务逻辑和结果仍然保持一致。

  • Parquet最适合放在以下位置:

    • 原始数据清洗完成后的中间层。

    • 下游多个任务共用的交换层。

    • 故障恢复时的阶段边界。

说明

如果把parquet_path换成生产数据湖路径(如/data/lakehouse/open-formats/parquet_case/orders_parquet),建议先执行mkdir -p创建父目录,再运行写入操作。

Lance:版本化数据资产与多模态特征表

下面的示例演示两个最常用的Lance操作:先写入基础版本,再追加一批新数据,并分别读取最新版本与历史版本。

from pathlib import Path
import shutil

import daft

root = Path("/tmp/daft_open_formats_doc/lance_case")
if root.exists():
    shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)

uri = str(root / "orders.lance")

df_v0 = daft.from_pydict(
    {
        "id": [1, 2],
        "city": ["Hangzhou", "Beijing"],
        "score": [0.1, 0.2],
    }
)
df_v1 = daft.from_pydict(
    {
        "id": [3],
        "city": ["Shanghai"],
        "score": [0.9],
    }
)

df_v0.write_lance(uri, mode="overwrite")
df_v1.write_lance(uri, mode="append")

latest = daft.read_lance(uri).sort("id")
snapshot = daft.read_lance(uri, version=1).sort("id")

print("latest =", latest.to_pydict())
print("snapshot =", snapshot.to_pydict())
  • 这段代码的实际用途如下:

    • write_lance(..., mode="overwrite"):创建基础数据集。

    • write_lance(..., mode="append"):在不重写旧数据的前提下新增版本。

    • read_lance(uri):读取最新版本,适合日常分析。

    • read_lance(uri, version=1):回到指定历史版本,适合复现某次实验或对账。

  • Lance更适合以下开发场景:

    • Embedding、标签、打分结果等需要版本管理的数据资产。

    • 图像、文本块、向量特征等多模态分析结果。

    • 需要追加更新,但又希望保留历史快照的实验表。

Lance:向量能力

Lance支持把结构化字段和向量列放在同一个版本化数据集中。更常见的工程分工是:

  • Daft负责上游文本、图像或多模态流水线,以及Embedding的生成。

  • Lance负责把向量列和业务元数据一起落盘,并通过Lance SDK执行原生向量检索。

下面的示例演示一个最小可运行的向量检索流程:

  1. 构造一张包含业务元数据和EmbeddingLance数据集。

  2. 使用Lance原生nearest查询做Top-K检索。

  3. 校验Top-1命中预期文档,并确认返回结果里仍然保留业务元数据列。

from pathlib import Path
import shutil

import lance
import pyarrow as pa

DIM = 4
TOP_K = 2

root = Path("/tmp/daft_open_formats_doc/lance_vector_search_case")
if root.exists():
    shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)

uri = str(root / "embeddings.lance")

doc_ids = pa.array(["doc-001", "doc-002", "doc-003"])
categories = pa.array(["faq", "faq", "policy"])
embedding_values = pa.array(
    [
        0.99,
        0.01,
        0.00,
        0.00,
        0.96,
        0.04,
        0.00,
        0.00,
        0.05,
        0.95,
        0.00,
        0.00,
    ],
    type=pa.float32(),
)
embeddings = pa.FixedSizeListArray.from_arrays(embedding_values, DIM)

table = pa.Table.from_arrays(
    [doc_ids, categories, embeddings],
    names=["doc_id", "category", "embedding"],
)

dataset = lance.write_dataset(table, uri, mode="overwrite")

query = pa.array([1.0, 0.0, 0.0, 0.0], type=pa.float32())
result = dataset.to_table(
    nearest={
        "column": "embedding",
        "q": query,
        "k": TOP_K,
        "metric": "cosine",
        "use_index": False,
    }
)

rows = result.to_pylist()

assert (root / "embeddings.lance").exists()
assert dataset.count_rows() == 3
assert len(rows) == TOP_K
assert rows[0]["doc_id"] == "doc-001"
assert "category" in rows[0]
assert "_distance" in rows[0]

print("vector_search_rows =", rows)
  • 这段代码的实际用途如下:

    • FixedSizeListArray:把Embedding列显式写成定长向量列,便于Lance识别为向量数据。

    • lance.write_dataset(...):把向量和业务元数据一起写成可版本化的数据集。

    • to_table(nearest=...):直接使用Lance原生向量检索能力,而不是把数据读回后再手动算相似度。

    • use_index=False:固定走精确KNN,方便在小样本验证时稳定复现结果。大表可按需补向量索引。

  • 这类写法特别适合以下场景:

    • 文档、图片或音频片段的Embedding资产管理。

    • RAG 入库前的离线验证和回归测试。

    • 需要把检索结果和原始业务字段一起返回给下游系统。

Lance:Branch能力

LanceBranch更像“数据资产的实验分叉”。当前更合理的工程分工是:

  • Daft负责主数据集的读写和分析。

  • Lance SDK负责Branch的创建、切换和隔离写入。

下面的示例演示一个完整可运行的Branch流程:

  1. Daft写出主表。

  2. Lance SDK从主表创建实验分支。

  3. 只在分支里追加一条新数据,不影响主分支。

  4. 把分支结果转成Arrow,再回到Daft继续分析。

from pathlib import Path
import shutil

import daft
import lance
import pyarrow as pa

root = Path("/tmp/daft_open_formats_doc/lance_branch_case")
if root.exists():
    shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)

uri = str(root / "features.lance")

daft.from_pydict({"id": [1, 2], "score": [0.1, 0.2]}).write_lance(uri, mode="overwrite")

branch_ds = lance.dataset(uri).create_branch("exp_branch")
lance.write_dataset(pa.table({"id": [3], "score": [0.9]}), uri=branch_ds, mode="append")

main_rows = lance.dataset(uri).to_table().to_pylist()
branch_rows = lance.dataset(uri).checkout_branch("exp_branch").to_table().to_pylist()
branch_df = daft.from_arrow(
    lance.dataset(uri).checkout_branch("exp_branch").to_table()
).sort("id")

print("main_rows =", main_rows)
print("branch_rows =", branch_rows)
print("branch_df =", branch_df.to_pydict())
  • 这段代码的实际用途如下:

    • create_Branch("exp_Branch"):从主版本拉出一个实验分支。

    • lance.write_dataset(..., uri=Branch_ds, mode="append"):只把新数据写到分支,不污染主线。

    • checkout_Branch("exp_Branch"):读取分支最新状态。

    • daft.from_arrow(...):把Branch数据重新接回Daft流水线继续处理。

  • 使用Lance Branch时需要注意以下事项:

    • read_lancewrite_lance负责主数据集、版本和快照的常规读写。

    • Branch生命周期管理更偏Lance SDK能力。

    • 如果团队对Branch依赖很重,最好把Lance版本固定住,因为Branch API与底层版本绑定更强。

Iceberg:带Catalog的表治理与快照读取

Iceberg侧更接近“表治理”而不是“单纯文件读写”。下面的示例使用本地SQLite catalog和本地warehouse创建一张Iceberg表,然后做两次写入,并分别读取最新快照和第一次写入后的快照。

from pathlib import Path
import shutil

import daft
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import DoubleType, LongType, NestedField, StringType

root = Path("/tmp/daft_open_formats_doc/iceberg_case")
if root.exists():
    shutil.rmtree(root)

warehouse = root / "warehouse"
warehouse.mkdir(parents=True, exist_ok=True)

catalog = SqlCatalog(
    "local",
    uri=f"sqlite:///{root / 'catalog.db'}",
    warehouse=str(warehouse),
)
catalog.create_namespace("demo")

schema = Schema(
    NestedField(1, "order_id", LongType(), required=True),
    NestedField(2, "city", StringType(), required=False),
    NestedField(3, "amount", DoubleType(), required=False),
)

table = catalog.create_table(("demo", "orders"), schema=schema)

daft.from_pydict(
    {
        "order_id": [1, 2],
        "city": ["Hangzhou", "Beijing"],
        "amount": [10.0, 20.0],
    }
).write_iceberg(table, mode="append")

first_snapshot_id = table.refresh().metadata.current_snapshot_id

daft.from_pydict(
    {
        "order_id": [3],
        "city": ["Shanghai"],
        "amount": [30.0],
    }
).write_iceberg(table, mode="append")

latest = daft.read_iceberg(table).sort("order_id")
first = daft.read_iceberg(table, snapshot_id=first_snapshot_id).sort("order_id")

print("first_snapshot_id =", first_snapshot_id)
print("latest =", latest.to_pydict())
print("first =", first.to_pydict())
  • 这段代码的实际用途如下:

    • SqlCatalog(...):在本地构造一个最小可运行的Iceberg catalog,便于单机验证表级写入逻辑。

    • catalog.create_table(...):显式创建受Schema管理的Iceberg表。

    • write_iceberg(table, mode="append"):向Iceberg表追加数据。

    • read_iceberg(table, snapshot_id=...):回放某个快照,复现某个时间点的表状态。

  • Iceberg更适合以下场景:

    • 数据表必须纳入catalog管理。

    • 需要稳定的快照语义和表级治理。

    • 团队希望把读写边界放在“表”而不是“目录中的文件集合”。

Checkpoint与崩溃恢复

Daft当前更常见的恢复模式不是“整条lineage自动截断”,而是自己把关键阶段物化成表或文件,再把这些物化结果当作恢复边界。

下面的示例演示一个最实用的恢复流程:

  1. 原始订单先清洗并写成Parquet。

  2. 下游特征阶段再从该Parquet继续处理,并写成Lance。

  3. 如果第二阶段中途崩溃,重跑时直接从Stage 1Parquet恢复,而不是重新处理原始输入。

from pathlib import Path
import shutil

import daft
from daft import col

root = Path("/tmp/daft_open_formats_doc/checkpoint_case")
if root.exists():
    shutil.rmtree(root)
root.mkdir(parents=True, exist_ok=True)

stage1 = str(root / "stage1_orders_parquet")
stage2 = str(root / "stage2_features.lance")

raw = daft.from_pydict(
    {
        "order_id": [1001, 1002, 1003, 1004],
        "city": ["Hangzhou", "Beijing", "Hangzhou", "Shanghai"],
        "status": ["paid", "paid", "refunded", "paid"],
        "amount": [128.0, 256.0, 64.0, 512.0],
    }
)

cleaned = (
    raw.where(col("status") == "paid")
    .with_column("amount_band", (col("amount") >= 256).cast(daft.DataType.int64()))
)
cleaned.write_parquet(stage1, write_mode="overwrite")

recovered = daft.read_parquet(stage1)
features = recovered.with_column(
    "city_amount_key",
    col("city") + daft.lit("_") + col("amount").cast(daft.DataType.string()),
)
features.write_lance(stage2, mode="overwrite")

print("stage1 =", daft.read_parquet(stage1).sort("order_id").to_pydict())
print("stage2 =", daft.read_lance(stage2).sort("order_id").to_pydict())
  • 这段代码的实际用途如下:

    • stage1:是第一道恢复边界。只要这一步已经成功写出,后续失败就不需要回到原始数据。

    • stage2:表示更靠后的特征结果层,适合继续做实验、向量检索或下游分析。

    • read_parquet(stage1):体现“从阶段物化结果恢复”,而不是依赖失败前的进程状态。

  • checkpoint时建议优先考虑以下边界:

    • 文本清洗和切片完成后。

    • 图像/视频解码完成后。

    • Embedding、特征或打分结果产出后。

    • 下游会被多个任务复用的阶段完成后。

    说明

    不要把checkpoint切得太细,也不要完全不切。切得太细会导致写放大和额外I/O,完全不切会让一次失败把整条流水线都拖回重跑。

开发建议

  • 默认先把Parquet当成通用阶段格式。它最容易调试、最容易交换、也最适合做恢复边界。

  • 当需要版本化特征表、向量表或多模态资产时,优先考虑Lance,而不是把所有中间产物都堆成Parquet。

  • 当已经有catalog、命名空间、快照治理和表级一致性要求时,再把目标表建成Iceberg。不要把Iceberg当成“只是另一个目录格式”。

  • Checkpoint应该切在“下游值得复用、上游代价较大”的位置,而不是每一步都切,也不是完全不切。

  • Lance Branch适合做实验分叉、回归比较和特征试验,但不建议把Branch当成日常批处理主流程的唯一恢复手段。

  • Lance向量能力部分重点验证的是“向量资产可写入、可读回、可检索”。更深的ANN索引调优或服务化检索接口,建议在Lance SDK文档中进一步了解。

  • 本地开发阶段尽量固定Python minor版本和Daft / Lance / PyIceberg版本。开放格式相关问题,很多时候是运行时兼容性问题,而不是DataFrame逻辑本身有问题。