Work with DLF Lance tables using Daft

更新时间:
复制 MD 格式

This guide explains how to read and write Lance tables managed by DLF using the Daft DataFrame engine. Daft provides a lazy DataFrame API well-suited for query filtering and batch computation scenarios.

Note

To read and write Lance tables directly with PyLance, see Using DLF Lance in Python.

Terminology

Component

Role

DLF

Catalog service that manages database/table metadata, stores Lance table paths, and issues temporary OSS credentials

Lance/PyLance

Data format and low-level read/write implementation responsible for actual I/O on OSS-hosted Lance datasets

Daft

DataFrame compute engine providing read_lance / write_lance interfaces

lance-dlf

Connector that retrieves table paths and temporary OSS credentials from DLF for Daft to use. Only exposes tables with type=lance-table

Architecture

User code
  → lance_namespace.connect("dlf", CONFIG)     # Connect to DLF catalog
  → DLF returns Lance table path + temporary OSS credentials
  → apply_oss_environment(...)                  # Set OSS_* environment variables
  → daft.read_lance("oss://...")               # Read data
  → df.write_lance("oss://...", mode="append") # Write data   

Concept mapping:

  • DLF database → Lance namespace

  • DLF table → Lance table

Prerequisites

Install dependencies

python3 -m pip install lance-dlf daft        

lance-dlf automatically installs lance_namespace, pyarrow, and other required dependencies.

Minimal configuration

CONFIG = {
    "uri": "http://<dlf-endpoint>",
    "warehouse": "<warehouse>",
    "token.provider": "dlf",
    "dlf.region": "<region>",
    "dlf.access-key-id": "<access-key-id>",
    "dlf.access-key-secret": "<access-key-secret>",
    "dlf.oss-endpoint": "<oss-endpoint>",
}

Connect to DLF

Importing lance_dlf automatically registers the dlf namespace:

import lance_namespace
import lance_dlf  # noqa: F401

ns = lance_namespace.connect("dlf", CONFIG)
print(ns.namespace_id())        

Read an existing table

Step 1: Get the table path and credentials

Before reading or writing a table with Daft, call describe_table() to retrieve the table path and temporary OSS credentials from DLF.

from lance_namespace import DescribeTableRequest

DATABASE = "<database>"
TABLE = "<table>"

desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))

print(desc.location)
print(sorted((desc.storage_options or {}).keys()))        

desc contains two key fields:

  • desc.location — the Lance table storage path in the format oss://bucket/path/to/table

  • desc.storage_options — a dictionary of temporary OSS credentials

Step 2: Set OSS credential environment variables

Daft uses PyLance under the hood to access OSS. Define an apply_oss_environment helper that maps the DLF-issued credentials to OSS_* environment variables, then call it:

import os


def apply_oss_environment(storage_options: dict) -> None:
    os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
    os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
    os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
    if storage_options.get("oss_security_token"):
        os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
    if storage_options.get("oss_region"):
        os.environ["OSS_REGION"] = storage_options["oss_region"]

# Apply credentials
apply_oss_environment(desc.storage_options or {})        

Step 3: Read table data with Daft

import daft

df = daft.read_lance(desc.location)
df.show()        

Write data

Append to an existing table

After completing Step 1: Get the table path and credentials and Step 2: Set OSS credential environment variables above, append data with mode="append":

# Prerequisites: connect to Catalog, get table path/credentials, set OSS env vars
desc = ns.describe_table(DescribeTableRequest(id=[DATABASE, TABLE]))
apply_oss_environment(desc.storage_options or {})

# Append data
append_df = daft.from_pydict({
    "f0": [204],
    "f1": ["daft-d"],
})

append_df.write_lance(desc.location, mode="append")

# Verify the write
df2 = daft.read_lance(desc.location)
df2.show()        

Create a new table and write data

New tables must first be created with ns.create_table() (which also writes the initial batch of data). Use Daft for subsequent reads and writes.

# Prerequisites: connect to Catalog, get table path/credentials, set OSS env vars
from datetime import datetime
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest

# Serialize an Arrow table to IPC bytes
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()


# Create the table with initial data
table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
table_id = [DATABASE, table_name]

rows = {
    "f0": [201, 202, 203],
    "f1": ["daft-a", "daft-b", "daft-c"],
}
arrow_table = pa.table(rows)

create_response = ns.create_table(
    CreateTableRequest(id=table_id),
    arrow_table_to_ipc_bytes(arrow_table),
)
print(create_response.location)      

After creation, retrieve credentials with describe_table and use Daft to read and write:

# Get credentials and set environment variables
desc = ns.describe_table(DescribeTableRequest(id=table_id))
apply_oss_environment(desc.storage_options or {})

# Read and verify
df = daft.read_lance(desc.location)
df.show()

# Append data with Daft
append_rows = {
    "f0": [204],
    "f1": ["daft-d"],
}
append_df = daft.from_pydict(append_rows)

meta = append_df.write_lance(desc.location, mode="append")
meta.show()

# Read again to confirm
appended_df = daft.read_lance(desc.location)
appended_df.show()
          

Expected output:

[
    {"f0": 201, "f1": "daft-a"},
    {"f0": 202, "f1": "daft-b"},
    {"f0": 203, "f1": "daft-c"},
    {"f0": 204, "f1": "daft-d"}
]
          

Complete example

The following script demonstrates the end-to-end workflow: create a new table → read → append → verify.

from __future__ import annotations

from datetime import datetime
import os

import daft
import lance_namespace
import pyarrow as pa
from lance_namespace import CreateTableRequest, DescribeTableRequest

import lance_dlf  # noqa: F401


CONFIG = {
    "uri": "http://<DLF-ENDPOINT>",
    "warehouse": "<YOUR-CATALOG>",
    "token.provider": "dlf",
    "dlf.region": "<REGION-ID>",
    "dlf.access-key-id": "<ACCESS-KEY-ID>",
    "dlf.access-key-secret": "<ACCESS-KEY-SECRET>",
    "dlf.oss-endpoint": "<OSS-ENDPOINT>",  # Required only for public network access to DLF
}

DATABASE = "default"

# Serialize an Arrow table to IPC bytes
def arrow_table_to_ipc_bytes(table: pa.Table) -> bytes:
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, table.schema) as writer:
        writer.write_table(table)
    return sink.getvalue().to_pybytes()

# Set OSS credential environment variables
def apply_oss_environment(storage_options: dict) -> None:
    os.environ["OSS_ENDPOINT"] = storage_options["oss_endpoint"]
    os.environ["OSS_ACCESS_KEY_ID"] = storage_options["oss_access_key_id"]
    os.environ["OSS_ACCESS_KEY_SECRET"] = storage_options["oss_secret_access_key"]
    if storage_options.get("oss_security_token"):
        os.environ["OSS_SECURITY_TOKEN"] = storage_options["oss_security_token"]
    if storage_options.get("oss_region"):
        os.environ["OSS_REGION"] = storage_options["oss_region"]


def df_to_pydict(df):
    try:
        return df.to_pydict()
    except AttributeError:
        return df.collect().to_pydict()


def main() -> None:
    ns = lance_namespace.connect("dlf", CONFIG)

    # 1. Create a new table
    table_name = "test_lance_daft_" + datetime.now().strftime("%Y%m%d_%H%M%S")
    table_id = [DATABASE, table_name]

    rows = {
        "f0": [201, 202, 203],
        "f1": ["daft-a", "daft-b", "daft-c"],
    }
    arrow_table = pa.table(rows)

    create_response = ns.create_table(
        CreateTableRequest(id=table_id),
        arrow_table_to_ipc_bytes(arrow_table),
    )
    print("created:", ".".join(table_id))
    print("location:", create_response.location)

    # 2. Get credentials
    desc = ns.describe_table(DescribeTableRequest(id=table_id))
    apply_oss_environment(desc.storage_options or {})

    # 3. Read and verify
    read_df = daft.read_lance(desc.location)
    read_df.show()
    if df_to_pydict(read_df) != rows:
        raise AssertionError("Initial readback mismatch")

    # 4. Append data
    append_rows = {
        "f0": [204],
        "f1": ["daft-d"],
    }
    append_df = daft.from_pydict(append_rows)
    append_df.write_lance(desc.location, mode="append").show()


    # 5. Final verification
    appended_df = daft.read_lance(desc.location)
    appended_df.show()
    expected = {
        "f0": rows["f0"] + append_rows["f0"],
        "f1": rows["f1"] + append_rows["f1"],
    }
    if df_to_pydict(appended_df) != expected:
        raise AssertionError("Daft append readback mismatch")

    print("daft + dlf + lance: ok")


if __name__ == "__main__":
    main()
        

Important notes

  • New table initialization: Use ns.create_table(...) to create a new table and write the first batch of data. Use Daft for all subsequent reads and writes.

  • Log sanitization: The full storage_options dictionary contains temporary AK/SK/token values. Print only the key list for safety: print(sorted((desc.storage_options or {}).keys()))