本文介绍 DLF Paimon 表的 BLOB(Binary Large Object)存储功能,包括存储模式、表选项及通过 EMR Serverless Spark、实时计算 Flink 和 PyPaimon 的使用方法。
概述
随着 AI 和多模态应用的兴起,数据湖需同时管理结构化数据(数值、文本)与非结构化数据(图片、视频、音频、文档)。传统方案将非结构化数据存放在 OSS 等对象存储中,结构化元数据存于数据库,两者分别管理,难以关联查询,且权限体系独立,无法统一管控。
DLF Paimon 表引入 BLOB(Binary Large Object)类型,将非结构化数据与结构化数据存储在同一张表中,统一管理元数据、权限和生命周期。
核心优势如下:
列级别分离存储:Blob 数据自动存入独立
.blob文件,结构化列存入 Parquet/Avro/ORC,互不影响。高效列裁剪:查询非 Blob 列时不加载 Blob 数据,避免无效 I/O。
灵活存储模式:支持 Blob 存储(默认)和描述符引用两种模式。
统一权限:通过 DLF 权限体系统一管控结构化与非结构化数据。
存储模式
DLF Paimon 为 BLOB 字段提供两种存储模式,适配不同业务需求。
默认 Blob 存储
Blob 原始字节写入 DLF Paimon 管理的 .blob 文件,存储在表路径下,由 DLF 统一管理文件生命周期。
适用场景:直接写入图片、视频等二进制数据,由 DLF 统一管理存储和权限。
描述符引用存储(Descriptor Only)
配置 blob-descriptor-field 后,DLF Paimon 不创建 .blob 文件,而是将序列化的 BlobDescriptor(包含 URI、offset、length)直接存储在 Parquet 等数据文件中。
适用场景:多表关联场景。例如一张表存储 Blob 原始数据,另一张表通过描述符引用关联已有的 Blob 文件,避免重复存储。
Blob 相关表选项
选项 | 必填 | 默认值 | 说明 |
row-tracking.enabled | 是 | false | Blob 表必须开启行追踪。 |
data-evolution.enabled | 是 | false | Blob 表必须开启数据演化。 |
blob-field | 否 | 无 | 指定按 Blob 方式存储的 BYTES 列。引擎层面 BYTES 类型映射为 BLOB。 |
blob-as-descriptor | 否 | false | 读写时返回 BlobDescriptor 字节而非实际 Blob 内容。支持动态修改。 |
blob-descriptor-field | 否 | 无 | 以描述符方式存储的 Blob 字段名,多个字段以逗号分隔。 |
blob.target-file-size | 否 | target-file-size | 单个 .blob 文件的目标大小。 |
通过 EMR Serverless Spark 使用
关于 EMR Serverless Spark 对接 DLF 的基础配置,请参见Serverless Spark 访问 DLF。
为获得完整的 Blob 功能支持,需使用自定义 Spark 包paimon-ali-emr-spark-3.5-1-ali-25.2.jar。配置示例如下:
将JAR包上传到OSS后,配置相应地址,使其可以被访问下载。
spark.emr.serverless.excludedModules paimon
spark.emr.serverless.user.defined.jars oss://my_bucket/blob_test/paimon-ali-emr-spark-3.5-1-ali-25.2.jar建表
-- 创建包含 Blob 列的图片表
CREATE TABLE my_db.image_table (
id INT,
name STRING,
category STRING,
image BINARY
) TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image'
);Spark SQL 中使用 BINARY 类型,通过 blob-field 选项声明为 Blob 存储。
写入数据
通过 SQL 直接写入二进制数据:
INSERT INTO my_db.image_table VALUES (1, 'sample', 'photo', X'89504E470D0A1A0A');通过 Notebook 从 OSS 读取图片写入 Blob 表(当前会话需具有对应 OSS 的访问权限):
image_df = spark.read.format("binaryFile").load("oss://my_bucket/path/test.jpg")
image_df.selectExpr(
"1 as id",
"'test.jpg' as name",
"'photo' as category",
"content as image"
).writeTo("my_db.image_table").append()查询数据
仅读取结构化列,不加载 Blob 数据:
SELECT id, name, category FROM my_db.image_table WHERE category = 'photo';读取包含 Blob 的完整行:
SELECT id, name, length(image) as img_size FROM my_db.image_table WHERE id = 1;在 Notebook 中展示图片:
from IPython.display import Image, display
row = spark.sql("SELECT image FROM my_db.image_table WHERE id = 1").first()
display(Image(data=row["image"]))描述符模式写入
当数据已存储在 OSS 上时,可通过 path_to_descriptor 函数直接写入文件引用:
CREATE TABLE my_db.video_table (
id INT,
title STRING,
video BINARY
) TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'video',
'blob-descriptor-field' = 'video'
);
-- 引用 OSS 上已有的视频文件
INSERT INTO my_db.video_table VALUES
(1, 'demo.mp4', sys.path_to_descriptor('oss://my-bucket/videos/demo.mp4')),
(2, 'intro.mp4', sys.path_to_descriptor('oss://my-bucket/videos/intro.mp4'));描述符模式:关联已有 Blob 表
创建标注结果表,通过描述符引用原始图片,不重复存储:
CREATE TABLE my_db.annotation_table (
annotation_id INT,
image_id INT,
label STRING,
confidence DOUBLE,
image BINARY
) TBLPROPERTIES (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image',
'blob-descriptor-field' = 'image'
);从原始表读取描述符,写入关联表:
INSERT INTO my_db.annotation_table
SELECT
row_number() OVER (ORDER BY t.id) as annotation_id,
t.id as image_id,
'cat' as label,
0.95 as confidence,
t.image -- 写入的是 BlobDescriptor,指向原始表的 .blob 文件
FROM my_db.image_table /*+ OPTIONS('blob-as-descriptor'='true') */ t;通过实时计算 Flink 使用
关于实时计算 Flink 对接 DLF 的基础配置,请参见实时计算 Flink 版访问 DLF。
说明 为保持功能更新到最新状态,需使用自定义 JAR 包,在数据管理中自定义 Catalog。paimon-ali-vvr-11-vvp-multimodal.jar
添加Catalog
自定义Catalog的type类型默认为paimon-multimodal-test,自定义type时,不可设置为paimon,会与原本的Connector冲突。
CREATE CATALOG my_catalog WITH (
'type' = 'paimon-multimodal-test',
'metastore' = 'rest',
'token.provider' = 'dlf',
'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com',
'warehouse' = 'dlf_test'
);建表与写入
Flink SQL中使用BYTES类型声明Blob列,通过blob-field选项指定为Blob存储。
-- 创建包含Blob列的图片表(默认Blob存储模式)
CREATE TABLE my_catalog.my_db.image_table (
id INT,
name STRING,
category STRING,
image BYTES
) WITH (
'row-tracking.enabled' = 'true',
'data-evolution.enabled' = 'true',
'blob-field' = 'image',
'blob-as-descriptor' = 'true'
);
-- 写入数据
INSERT INTO my_catalog.my_db.image_table VALUES
(1, 'cat.jpg', 'photo', my_catalog.sys.path_to_descriptor('oss://my-bucket/images/cat.jpg')),
(2, 'dog.jpg', 'photo', my_catalog.sys.path_to_descriptor('oss://my-bucket/images/dog.jpg'));查询数据
-- 仅读取结构化列,不加载 Blob 数据
SELECT id, name, category FROM my_catalog.my_db.image_table;
-- 读取Blob内容,获取文件大小
SELECT id, name, descriptor_to_string(image) as image_desc FROM my_catalog.my_db.image_table;
-- 以描述符形式读取(获取BlobDescriptor而非实际内容)
SELECT id, name, length(image) as img_size
FROM my_catalog.my_db.image_table /*+ OPTIONS('blob-as-descriptor'='false') */;也可以通过 INSERT INTO ... SELECT 从其他已有表导入数据。
通过 PyPaimon 使用
PyPaimon 提供完整的 Blob 读写支持,适合 AI 推理管线中对图片和音视频的处理。
建表与写入 Blob 数据
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
# 连接 DLF Catalog
catalog = CatalogFactory.create({
'metastore': 'rest',
'uri': 'https://${regionId}-vpc.dlf.aliyuncs.com',
'warehouse': 'my_catalog',
'token.provider': 'dlf',
'dlf.access-key-id': '<AK>',
'dlf.access-key-secret': '<SK>',
})
pa_schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('picture', pa.large_binary())
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-field': 'picture',
}
)
catalog.create_table('my_db.image_table', schema, True)
table = catalog.get_table('my_db.image_table')
# 写入 Blob 数据
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
image_bytes = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100
data = pa.Table.from_pydict({
'id': [1],
'name': ['sample.png'],
'picture': [image_bytes],
}, schema=pa_schema)
table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()从 OSS 读取图片写入:
from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
oss_io = FileIO.get('oss://my-bucket/', Options({
'fs.oss.accessKeyId': '<YOUR-AK>',
'fs.oss.accessKeySecret': '<YOUR-SK>',
'fs.oss.endpoint': 'https://oss-cn-hangzhou.aliyuncs.com',
}))
with oss_io.new_input_stream('oss://my-bucket/path/test.jpg') as f:
image_bytes = f.read()读取 Blob 数据
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
# 获取图片二进制数据
for i in range(result.num_rows):
pic_bytes = result.column('picture')[i].as_py()
with open(f'output_{i}.png', 'wb') as f:
f.write(pic_bytes)列裁剪:仅读取结构化列
read_builder = table.new_read_builder().with_projection(['id', 'name'])
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_pandas(splits)描述符模式:关联已有 Blob 表
from pypaimon.table.row.blob import BlobDescriptor
# 建表:video 列以描述符方式存储
pa_schema = pa.schema([
('id', pa.int32()),
('title', pa.string()),
('video', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-field': 'video',
'blob-descriptor-field': 'video',
}
)
catalog.create_table('my_db.video_table', schema, False)
table = catalog.get_table('my_db.video_table')
# 构造 BlobDescriptor(指向已存在的 OSS 文件)
video_path = 'oss://my-bucket/videos/demo.mp4'
video_size = 1024000 # 文件大小(字节),需提前获取
descriptor = BlobDescriptor(video_path, 0, video_size)
# 写入描述符
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_arrow(pa.Table.from_pydict({
'id': [1],
'title': ['demo video'],
'video': [descriptor.serialize()],
}, schema=pa_schema))
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()读取描述符并还原数据:
from pypaimon.table.row.blob import BlobDescriptor, Blob
from pypaimon.common.uri_reader import FileUriReader
read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)
for i in range(result.num_rows):
video_bytes = result.column('video')[i].as_py()
descriptor = BlobDescriptor.deserialize(video_bytes)
print(f"URI: {descriptor.uri}, offset: {descriptor.offset}, length: {descriptor.length}")
# 还原为实际数据
uri_reader = FileUriReader(table.file_io)
blob = Blob.from_descriptor(uri_reader, descriptor)
data = blob.to_data()
print(f"Blob data size: {len(data)} bytes")存储布局
定义包含 Blob 列的表后,DLF Paimon 自动分离存储:
table/
├── bucket-0/
│ ├── data-uuid-0.parquet # 结构化列(id, name 等)
│ ├── data-uuid-1.blob # Blob 数据(picture 列)
│ ├── data-uuid-2.blob # 更多 Blob 数据
│ └── ...
├── manifest/
├── schema/
└── snapshot/使用限制
Blob 类型仅适用于 Append 表(无主键表),不支持主键表。
Blob 列不支持谓词下推,不能在 WHERE 条件中过滤 Blob 列。
Blob 列不支持统计信息收集。
必须同时开启
row-tracking.enabled和data-evolution.enabled。可根据 Blob 数据大小合理设置
blob.target-file-size。