Blob storage

更新时间:
复制 MD 格式

DLF Paimon tables support blob storage for unstructured data such as images, videos, and documents. You can configure storage modes and table options, and access blob data through EMR Serverless Spark, Realtime Compute for Apache Flink, or PyPaimon.

Overview

As AI and multimodal applications grow, data lakes must manage both structured data (such as numbers and text) and unstructured data (such as images, videos, audio, and documents). Traditional solutions store unstructured data in object storage services like OSS and structured metadata in databases. Managing these sources separately makes join queries difficult and creates independent permission systems, preventing unified governance.

DLF Paimon tables introduce the blob (Binary Large Object) type to store unstructured and structured data in the same table. This approach unifies the management of metadata, permissions, and lifecycles.

Key advantages include:

  • Column-level separate storage: Blob data is automatically stored in separate .blob files, while structured columns are stored in Parquet, Avro, or ORC files. This separation prevents interference.

  • Efficient column pruning: When you query non-blob columns, the system does not load blob data, which avoids unnecessary I/O.

  • Flexible storage modes: Supports two modes: default blob storage and descriptor-only storage.

  • Unified permissions: You can manage both structured and unstructured data through the unified DLF permission system.

Storage modes

DLF Paimon provides two storage modes for blob fields.

Default blob storage

The raw bytes of a blob are written to .blob files managed by DLF Paimon. These files are stored within the table's path, and DLF manages their lifecycle.

Use cases: Suitable for directly writing binary data, such as images and videos, where DLF uniformly manages storage and permissions.

Descriptor-only storage

When you configure the blob-descriptor-field option, DLF Paimon does not create .blob files. Instead, it stores a serialized BlobDescriptor, which includes a URI, offset, and length, directly within data files such as Parquet.

Use cases: Ideal for multi-table join scenarios. For example, one table can store the original blob data, while another table references the existing blob files using descriptors to avoid data duplication.

Blob table options

Parameter

Required

Default

Description

row-tracking.enabled

Yes

false

Row tracking must be enabled for tables with blob columns.

data-evolution.enabled

Yes

false

Data evolution must be enabled for tables with blob columns.

blob-field

No

None

Specifies the BYTES column to be stored as a blob. The engine maps the BYTES type to the blob type.

blob-as-descriptor

No

false

When enabled, read and write operations return the BlobDescriptor bytes instead of the actual blob content. You can modify this option dynamically.

blob-descriptor-field

No

None

The names of blob fields to be stored as descriptors. Separate multiple field names with a comma (,).

blob.target-file-size

No

target-file-size

The target size for a single .blob file.

Using EMR Serverless Spark

For basic configuration of EMR Serverless Spark with DLF, see Access DLF from Serverless Spark.

For full blob feature support, you must use a custom Spark package: paimon-ali-emr-spark-3.5-1-ali-25.2.jar. The following is an example configuration:

Upload the JAR package to OSS and configure its path.
spark.emr.serverless.excludedModules    paimon
spark.emr.serverless.user.defined.jars  oss://my_bucket/blob_test/paimon-ali-emr-spark-3.5-1-ali-25.2.jar

Create table

-- Create an image table with a blob column.
CREATE TABLE my_db.image_table (
    id INT,
    name STRING,
    category STRING,
    image BINARY
) TBLPROPERTIES (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true',
    'blob-field' = 'image'
);

In Spark SQL, use the BINARY type for the blob column and declare it as blob storage with the blob-field table property.

Write data

Write binary data directly using SQL:

INSERT INTO my_db.image_table VALUES (1, 'sample', 'photo', X'89504E470D0A1A0A');

You can also use a notebook to read an image from OSS and write it to the blob table. The current session must have permission to access the specified OSS location.

image_df = spark.read.format("binaryFile").load("oss://my_bucket/path/test.jpg")

image_df.selectExpr(
    "1 as id",
    "'test.jpg' as name",
    "'photo' as category",
    "content as image"
).writeTo("my_db.image_table").append()

Query data

Read only the structured columns without loading blob data:

SELECT id, name, category FROM my_db.image_table WHERE category = 'photo';

Read a full row, including the blob data:

SELECT id, name, length(image) as img_size FROM my_db.image_table WHERE id = 1;

Display the image in a notebook:

from IPython.display import Image, display

row = spark.sql("SELECT image FROM my_db.image_table WHERE id = 1").first()
display(Image(data=row["image"]))

Writing in descriptor mode

When data is already stored in OSS, you can directly write a file reference by using the path_to_descriptor function:

CREATE TABLE my_db.video_table (
    id INT,
    title STRING,
    video BINARY
) TBLPROPERTIES (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true',
    'blob-field' = 'video',
    'blob-descriptor-field' = 'video'
);

-- Reference existing video files on OSS.
INSERT INTO my_db.video_table VALUES
    (1, 'demo.mp4',  sys.path_to_descriptor('oss://my-bucket/videos/demo.mp4')),
    (2, 'intro.mp4', sys.path_to_descriptor('oss://my-bucket/videos/intro.mp4'));

Descriptor mode: Link to a blob table

Create a table for annotation results that uses a descriptor to reference the original image, avoiding data duplication:

CREATE TABLE my_db.annotation_table (
    annotation_id INT,
    image_id INT,
    label STRING,
    confidence DOUBLE,
    image BINARY
) TBLPROPERTIES (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true',
    'blob-field' = 'image',
    'blob-descriptor-field' = 'image'
);

Read descriptors from the original table and write them to the linked table:

INSERT INTO my_db.annotation_table
SELECT
    row_number() OVER (ORDER BY t.id) as annotation_id,
    t.id as image_id,
    'cat' as label,
    0.95 as confidence,
    t.image  -- Writes the BlobDescriptor, which points to the .blob file in the source table.
FROM my_db.image_table /*+ OPTIONS('blob-as-descriptor'='true') */ t;

Using Realtime Compute for Apache Flink

For basic configuration of Realtime Compute for Apache Flink with DLF, see Access DLF from Realtime Compute for Apache Flink.

Note To use the latest features, use a custom JAR package and create a custom catalog in Data Management: paimon-ali-vvr-11-vvp-multimodal.jar

Add a catalog

The default type for a custom catalog is paimon-multimodal-test. When creating a custom catalog, do not set the type to paimon to avoid conflicts with the original connector.
CREATE CATALOG my_catalog WITH (
  'type' = 'paimon-multimodal-test',
  'metastore' = 'rest',
  'token.provider' = 'dlf',
  'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com',
  'warehouse' = 'dlf_test'
);

Create table and write data

In Flink SQL, use the BYTES type to declare the blob column and specify it as blob storage by using the blob-field option.

-- Create an image table with a blob column (default blob storage mode).
CREATE TABLE my_catalog.my_db.image_table (
    id INT,
    name STRING,
    category STRING,
    image BYTES
) WITH (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true',
    'blob-field' = 'image',
    'blob-as-descriptor' = 'true'
);

-- Write data.
INSERT INTO my_catalog.my_db.image_table VALUES
    (1, 'cat.jpg',  'photo', my_catalog.sys.path_to_descriptor('oss://my-bucket/images/cat.jpg')),
    (2, 'dog.jpg',  'photo', my_catalog.sys.path_to_descriptor('oss://my-bucket/images/dog.jpg'));

Query data

-- Read only the structured columns without loading blob data.
SELECT id, name, category FROM my_catalog.my_db.image_table;

-- Read the blob descriptor and view its string representation.
SELECT id, name, descriptor_to_string(image) as image_desc FROM my_catalog.my_db.image_table;

-- Read the actual blob content and get its size.
SELECT id, name, length(image) as img_size
FROM my_catalog.my_db.image_table /*+ OPTIONS('blob-as-descriptor'='false') */;

You can also import data from other existing tables using INSERT INTO ... SELECT.

Using PyPaimon

PyPaimon fully supports reading and writing blobs, making it suitable for processing images and videos in AI inference pipelines.

Create table and write blob data

import pyarrow as pa
from pypaimon import CatalogFactory, Schema

# Connect to the DLF Catalog.
catalog = CatalogFactory.create({
    'metastore': 'rest',
    'uri': 'https://${regionId}-vpc.dlf.aliyuncs.com',
    'warehouse': 'my_catalog',
    'token.provider': 'dlf',
    'dlf.access-key-id': '<YOUR_ACCESS_KEY_ID>',
    'dlf.access-key-secret': '<YOUR_ACCESS_KEY_SECRET>',
})

pa_schema = pa.schema([
    ('id', pa.int64()),
    ('name', pa.string()),
    ('picture', pa.large_binary())
])

schema = Schema.from_pyarrow_schema(
    pa_schema,
    options={
        'row-tracking.enabled': 'true',
        'data-evolution.enabled': 'true',
        'blob-field': 'picture',
    }
)
catalog.create_table('my_db.image_table', schema, True)
table = catalog.get_table('my_db.image_table')

# Write blob data.
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

image_bytes = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100

data = pa.Table.from_pydict({
    'id': [1],
    'name': ['sample.png'],
    'picture': [image_bytes],
}, schema=pa_schema)

table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

Read an image from OSS and write it to the table:

from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options

oss_io = FileIO.get('oss://my-bucket/', Options({
    'fs.oss.accessKeyId': '<YOUR_ACCESS_KEY_ID>',
    'fs.oss.accessKeySecret': '<YOUR_ACCESS_KEY_SECRET>',
    'fs.oss.endpoint': 'https://oss-cn-hangzhou.aliyuncs.com',
}))
with oss_io.new_input_stream('oss://my-bucket/path/test.jpg') as f:
    image_bytes = f.read()

Read blob data

read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)

# Get the binary data of the image.
for i in range(result.num_rows):
    pic_bytes = result.column('picture')[i].as_py()
    with open(f'output_{i}.png', 'wb') as f:
        f.write(pic_bytes)

Column pruning: Read structured columns

read_builder = table.new_read_builder().with_projection(['id', 'name'])
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_pandas(splits)

Descriptor mode: Link to a blob table

from pypaimon.table.row.blob import BlobDescriptor

# Create a table that stores the 'video' column as a descriptor.
pa_schema = pa.schema([
    ('id', pa.int32()),
    ('title', pa.string()),
    ('video', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
    pa_schema,
    options={
        'row-tracking.enabled': 'true',
        'data-evolution.enabled': 'true',
        'blob-field': 'video',
        'blob-descriptor-field': 'video',
    }
)
catalog.create_table('my_db.video_table', schema, False)
table = catalog.get_table('my_db.video_table')

# Construct a BlobDescriptor that points to an existing OSS file.
video_path = 'oss://my-bucket/videos/demo.mp4'
video_size = 1024000  # File size in bytes. You must obtain this value in advance.
descriptor = BlobDescriptor(video_path, 0, video_size)

# Write the descriptor.
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

table_write.write_arrow(pa.Table.from_pydict({
    'id': [1],
    'title': ['demo video'],
    'video': [descriptor.serialize()],
}, schema=pa_schema))

table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

Read the descriptor and retrieve the data:

from pypaimon.table.row.blob import BlobDescriptor, Blob
from pypaimon.common.uri_reader import FileUriReader

read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)

for i in range(result.num_rows):
    video_bytes = result.column('video')[i].as_py()
    descriptor = BlobDescriptor.deserialize(video_bytes)
    print(f"URI: {descriptor.uri}, offset: {descriptor.offset}, length: {descriptor.length}")

    # Retrieve the actual data.
    uri_reader = FileUriReader(table.file_io)
    blob = Blob.from_descriptor(uri_reader, descriptor)
    data = blob.to_data()
    print(f"Blob data size: {len(data)} bytes")

Storage layout

After you define a table with a blob column, DLF Paimon automatically separates the storage:

table/
├── bucket-0/
│   ├── data-uuid-0.parquet      # Structured columns (id, name, etc.)
│   ├── data-uuid-1.blob         # Blob data (the picture column)
│   ├── data-uuid-2.blob         # More blob data
│   └── ...
├── manifest/
├── schema/
└── snapshot/

Limitations

  • The blob type is supported only for append-only tables (tables without primary keys).

  • Blob columns do not support predicate pushdown. You cannot use a blob column in a WHERE clause for filtering.

  • Blob columns do not support statistics collection.

  • Both row-tracking.enabled and data-evolution.enabled must be enabled.

  • The blob.target-file-size option should be set to a value appropriate for the size of your blob data.