DLF Paimon tables support blob storage for unstructured data such as images, videos, and documents. You can configure storage modes and table options, and access blob data through EMR Serverless Spark, Realtime Compute for Apache Flink, or PyPaimon.
Overview
As AI and multimodal applications grow, data lakes must manage both structured data (such as numbers and text) and unstructured data (such as images, videos, audio, and documents). Traditional solutions store unstructured data in object storage services like OSS and structured metadata in databases. Managing these sources separately makes join queries difficult and creates independent permission systems, preventing unified governance.
DLF Paimon tables introduce the blob (Binary Large Object) type to store unstructured and structured data in the same table. This approach unifies the management of metadata, permissions, and lifecycles.
Key advantages include:
-
Column-level separate storage: Blob data is automatically stored in separate
.blobfiles, while structured columns are stored in Parquet, Avro, or ORC files. This separation prevents interference. -
Efficient column pruning: When you query non-blob columns, the system does not load blob data, which avoids unnecessary I/O.
-
Flexible storage modes: Supports two modes: default blob storage and descriptor-only storage.
-
Unified permissions: You can manage both structured and unstructured data through the unified DLF permission system.
Storage modes
DLF Paimon provides two storage modes for blob fields.
Default blob storage
The raw bytes of a blob are written to .blob files managed by DLF Paimon. These files are stored within the table's path, and DLF manages their lifecycle.
Use cases: Suitable for directly writing binary data, such as images and videos, where DLF uniformly manages storage and permissions.
Descriptor-only storage
When you configure the blob-descriptor-field option, DLF Paimon does not create .blob files. Instead, it stores a serialized BlobDescriptor, which includes a URI, offset, and length, directly within data files such as Parquet.
Use cases: Ideal for multi-table join scenarios. For example, one table can store the original blob data, while another table references the existing blob files using descriptors to avoid data duplication.
Blob table options
|
Parameter |
Required |
Default |
Description |
|
row-tracking.enabled |
Yes |
false |
Row tracking must be enabled for tables with blob columns. |
|
data-evolution.enabled |
Yes |
false |
Data evolution must be enabled for tables with blob columns. |
|
blob-field |
No |
None |
Specifies the BYTES column to be stored as a blob. The engine maps the BYTES type to the blob type. |
|
blob-as-descriptor |
No |
false |
When enabled, read and write operations return the |
|
blob-descriptor-field |
No |
None |
The names of blob fields to be stored as descriptors. Separate multiple field names with a comma (,). |
|
blob.target-file-size |
No |
target-file-size |
The target size for a single |
Using EMR Serverless Spark
For basic configuration of EMR Serverless Spark with DLF, see Access DLF from Serverless Spark.
For full blob feature support, you must use a custom Spark package: paimon-ali-emr-spark-3.5-1-ali-25.2.jar. The following is an example configuration:
Upload the JAR package to OSS and configure its path.
spark.emr.serverless.excludedModules paimon
spark.emr.serverless.user.defined.jars oss://my_bucket/blob_test/paimon-ali-emr-spark-3.5-1-ali-25.2.jar
Create table
-- Create an image table with a blob column.
CREATE TABLE my_db.image_table (
id INT,
name STRING,
category STRING,
image BINARY
) TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image'
);
In Spark SQL, use the BINARY type for the blob column and declare it as blob storage with the blob-field table property.
Write data
Write binary data directly using SQL:
INSERT INTO my_db.image_table VALUES (1, 'sample', 'photo', X'89504E470D0A1A0A');
You can also use a notebook to read an image from OSS and write it to the blob table. The current session must have permission to access the specified OSS location.
image_df = spark.read.format("binaryFile").load("oss://my_bucket/path/test.jpg")
image_df.selectExpr(
"1 as id",
"'test.jpg' as name",
"'photo' as category",
"content as image"
).writeTo("my_db.image_table").append()
Query data
Read only the structured columns without loading blob data:
SELECT id, name, category FROM my_db.image_table WHERE category = 'photo';
Read a full row, including the blob data:
SELECT id, name, length(image) as img_size FROM my_db.image_table WHERE id = 1;
Display the image in a notebook:
from IPython.display import Image, display
row = spark.sql("SELECT image FROM my_db.image_table WHERE id = 1").first()
display(Image(data=row["image"]))
Writing in descriptor mode
When data is already stored in OSS, you can directly write a file reference by using the path_to_descriptor function:
CREATE TABLE my_db.video_table (
id INT,
title STRING,
video BINARY
) TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'video',
'blob-descriptor-field' = 'video'
);
-- Reference existing video files on OSS.
INSERT INTO my_db.video_table VALUES
(1, 'demo.mp4', sys.path_to_descriptor('oss://my-bucket/videos/demo.mp4')),
(2, 'intro.mp4', sys.path_to_descriptor('oss://my-bucket/videos/intro.mp4'));
Descriptor mode: Link to a blob table
Create a table for annotation results that uses a descriptor to reference the original image, avoiding data duplication:
CREATE TABLE my_db.annotation_table (
annotation_id INT,
image_id INT,
label STRING,
confidence DOUBLE,
image BINARY
) TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image',
'blob-descriptor-field' = 'image'
);
Read descriptors from the original table and write them to the linked table:
INSERT INTO my_db.annotation_table
SELECT
row_number() OVER (ORDER BY t.id) as annotation_id,
t.id as image_id,
'cat' as label,
0.95 as confidence,
t.image -- Writes the BlobDescriptor, which points to the .blob file in the source table.
FROM my_db.image_table /*+ OPTIONS('blob-as-descriptor'='true') */ t;
Using Realtime Compute for Apache Flink
For basic configuration of Realtime Compute for Apache Flink with DLF, see Access DLF from Realtime Compute for Apache Flink.
Note To use the latest features, use a custom JAR package and create a custom catalog in Data Management: paimon-ali-vvr-11-vvp-multimodal.jar
Add a catalog
The default type for a custom catalog is paimon-multimodal-test. When creating a custom catalog, do not set the type to paimon to avoid conflicts with the original connector.
CREATE CATALOG my_catalog WITH (
'type' = 'paimon-multimodal-test',
'metastore' = 'rest',
'token.provider' = 'dlf',
'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com',
'warehouse' = 'dlf_test'
);
Create table and write data
In Flink SQL, use the BYTES type to declare the blob column and specify it as blob storage by using the blob-field option.
-- Create an image table with a blob column (default blob storage mode).
CREATE TABLE my_catalog.my_db.image_table (
id INT,
name STRING,
category STRING,
image BYTES
) WITH (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image',
'blob-as-descriptor' = 'true'
);
-- Write data.
INSERT INTO my_catalog.my_db.image_table VALUES
(1, 'cat.jpg', 'photo', my_catalog.sys.path_to_descriptor('oss://my-bucket/images/cat.jpg')),
(2, 'dog.jpg', 'photo', my_catalog.sys.path_to_descriptor('oss://my-bucket/images/dog.jpg'));
Query data
-- Read only the structured columns without loading blob data.
SELECT id, name, category FROM my_catalog.my_db.image_table;
-- Read the blob descriptor and view its string representation.
SELECT id, name, descriptor_to_string(image) as image_desc FROM my_catalog.my_db.image_table;
-- Read the actual blob content and get its size.
SELECT id, name, length(image) as img_size
FROM my_catalog.my_db.image_table /*+ OPTIONS('blob-as-descriptor'='false') */;
You can also import data from other existing tables using INSERT INTO ... SELECT.
Using PyPaimon
PyPaimon fully supports reading and writing blobs, making it suitable for processing images and videos in AI inference pipelines.
Create table and write blob data
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
# Connect to the DLF Catalog.
catalog = CatalogFactory.create({
'metastore': 'rest',
'uri': 'https://${regionId}-vpc.dlf.aliyuncs.com',
'warehouse': 'my_catalog',
'token.provider': 'dlf',
'dlf.access-key-id': '<YOUR_ACCESS_KEY_ID>',
'dlf.access-key-secret': '<YOUR_ACCESS_KEY_SECRET>',
})
pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('picture', pa.large_binary())
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-field': 'picture',
}
)
catalog.create_table('my_db.image_table', schema, True)
table = catalog.get_table('my_db.image_table')
# Write blob data.
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
image_bytes = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100
data = pa.Table.from_pydict({
'id': [1],
'name': ['sample.png'],
'picture': [image_bytes],
}, schema=pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
Read an image from OSS and write it to the table:
from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
oss_io = FileIO.get('oss://my-bucket/', Options({
'fs.oss.accessKeyId': '<YOUR_ACCESS_KEY_ID>',
'fs.oss.accessKeySecret': '<YOUR_ACCESS_KEY_SECRET>',
'fs.oss.endpoint': 'https://oss-cn-hangzhou.aliyuncs.com',
}))
with oss_io.new_input_stream('oss://my-bucket/path/test.jpg') as f:
image_bytes = f.read()
Read blob data
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
# Get the binary data of the image.
for i in range(result.num_rows):
pic_bytes = result.column('picture')[i].as_py()
with open(f'output_{i}.png', 'wb') as f:
f.write(pic_bytes)
Column pruning: Read structured columns
read_builder = table.new_read_builder().with_projection(['id', 'name'])
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_pandas(splits)
Descriptor mode: Link to a blob table
from pypaimon.table.row.blob import BlobDescriptor
# Create a table that stores the 'video' column as a descriptor.
pa_schema = pa.schema([
('id', pa.int32()),
('title', pa.string()),
('video', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-field': 'video',
'blob-descriptor-field': 'video',
}
)
catalog.create_table('my_db.video_table', schema, False)
table = catalog.get_table('my_db.video_table')
# Construct a BlobDescriptor that points to an existing OSS file.
video_path = 'oss://my-bucket/videos/demo.mp4'
video_size = 1024000 # File size in bytes. You must obtain this value in advance.
descriptor = BlobDescriptor(video_path, 0, video_size)
# Write the descriptor.
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_arrow(pa.Table.from_pydict({
'id': [1],
'title': ['demo video'],
'video': [descriptor.serialize()],
}, schema=pa_schema))
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
Read the descriptor and retrieve the data:
from pypaimon.table.row.blob import BlobDescriptor, Blob
from pypaimon.common.uri_reader import FileUriReader
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
for i in range(result.num_rows):
video_bytes = result.column('video')[i].as_py()
descriptor = BlobDescriptor.deserialize(video_bytes)
print(f"URI: {descriptor.uri}, offset: {descriptor.offset}, length: {descriptor.length}")
# Retrieve the actual data.
uri_reader = FileUriReader(table.file_io)
blob = Blob.from_descriptor(uri_reader, descriptor)
data = blob.to_data()
print(f"Blob data size: {len(data)} bytes")
Storage layout
After you define a table with a blob column, DLF Paimon automatically separates the storage:
table/
├── bucket-0/
│ ├── data-uuid-0.parquet # Structured columns (id, name, etc.)
│ ├── data-uuid-1.blob # Blob data (the picture column)
│ ├── data-uuid-2.blob # More blob data
│ └── ...
├── manifest/
├── schema/
└── snapshot/
Limitations
-
The blob type is supported only for append-only tables (tables without primary keys).
-
Blob columns do not support predicate pushdown. You cannot use a blob column in a
WHEREclause for filtering. -
Blob columns do not support statistics collection.
-
Both
row-tracking.enabledanddata-evolution.enabledmust be enabled. -
The
blob.target-file-sizeoption should be set to a value appropriate for the size of your blob data.