Blob存储

更新时间:
复制为 MD 格式

本文介绍 DLF Paimon 表的 BLOB(Binary Large Object)存储功能,包括存储模式、表选项及通过 EMR Serverless Spark、实时计算 Flink 和 PyPaimon 的使用方法。

概述

随着 AI 和多模态应用的兴起,数据湖需同时管理结构化数据(数值、文本)与非结构化数据(图片、视频、音频、文档)。传统方案将非结构化数据存放在 OSS 等对象存储中,结构化元数据存于数据库,两者分别管理,难以关联查询,且权限体系独立,无法统一管控。

DLF Paimon 表引入 BLOB(Binary Large Object)类型,将非结构化数据与结构化数据存储在同一张表中,统一管理元数据、权限和生命周期。

核心优势如下:

  • 列级别分离存储:Blob 数据自动存入独立 .blob 文件,结构化列存入 Parquet/Avro/ORC,互不影响。

  • 高效列裁剪:查询非 Blob 列时不加载 Blob 数据,避免无效 I/O。

  • 灵活存储模式:支持 Blob 存储(默认)和描述符引用两种模式。

  • 统一权限:通过 DLF 权限体系统一管控结构化与非结构化数据。

存储模式

DLF Paimon 为 BLOB 字段提供两种存储模式,适配不同业务需求。

默认 Blob 存储

Blob 原始字节写入 DLF Paimon 管理的 .blob 文件,存储在表路径下,由 DLF 统一管理文件生命周期。

适用场景:直接写入图片、视频等二进制数据,由 DLF 统一管理存储和权限。

描述符引用存储(Descriptor Only)

配置 blob-descriptor-field 后,DLF Paimon 不创建 .blob 文件,而是将序列化的 BlobDescriptor(包含 URI、offset、length)直接存储在 Parquet 等数据文件中。

适用场景:多表关联场景。例如一张表存储 Blob 原始数据,另一张表通过描述符引用关联已有的 Blob 文件,避免重复存储。

Blob 相关表选项

选项

必填

默认值

说明

row-tracking.enabled

false

Blob 表必须开启行追踪。

data-evolution.enabled

false

Blob 表必须开启数据演化。

blob-field

指定按 Blob 方式存储的 BYTES 列。引擎层面 BYTES 类型映射为 BLOB。

blob-as-descriptor

false

读写时返回 BlobDescriptor 字节而非实际 Blob 内容。支持动态修改。

blob-descriptor-field

以描述符方式存储的 Blob 字段名,多个字段以逗号分隔。

blob.target-file-size

target-file-size

单个 .blob 文件的目标大小。

通过 EMR Serverless Spark 使用

关于 EMR Serverless Spark 对接 DLF 的基础配置,请参见Serverless Spark 访问 DLF

为获得完整的 Blob 功能支持,需使用自定义 Spark 包paimon-ali-emr-spark-3.5-1-ali-25.2.jar。配置示例如下:

JAR包上传到OSS后,配置相应地址,使其可以被访问下载。
spark.emr.serverless.excludedModules    paimon
spark.emr.serverless.user.defined.jars  oss://my_bucket/blob_test/paimon-ali-emr-spark-3.5-1-ali-25.2.jar

建表

-- 创建包含 Blob 列的图片表
CREATE TABLE my_db.image_table (
    id INT,
    name STRING,
    category STRING,
    image BINARY
) TBLPROPERTIES (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true',
    'blob-field' = 'image'
);

Spark SQL 中使用 BINARY 类型,通过 blob-field 选项声明为 Blob 存储。

写入数据

通过 SQL 直接写入二进制数据:

INSERT INTO my_db.image_table VALUES (1, 'sample', 'photo', X'89504E470D0A1A0A');

通过 Notebook 从 OSS 读取图片写入 Blob 表(当前会话需具有对应 OSS 的访问权限):

image_df = spark.read.format("binaryFile").load("oss://my_bucket/path/test.jpg")

image_df.selectExpr(
    "1 as id",
    "'test.jpg' as name",
    "'photo' as category",
    "content as image"
).writeTo("my_db.image_table").append()

查询数据

仅读取结构化列,不加载 Blob 数据:

SELECT id, name, category FROM my_db.image_table WHERE category = 'photo';

读取包含 Blob 的完整行:

SELECT id, name, length(image) as img_size FROM my_db.image_table WHERE id = 1;

在 Notebook 中展示图片:

from IPython.display import Image, display

row = spark.sql("SELECT image FROM my_db.image_table WHERE id = 1").first()
display(Image(data=row["image"]))

描述符模式写入

当数据已存储在 OSS 上时,可通过 path_to_descriptor 函数直接写入文件引用:

CREATE TABLE my_db.video_table (
    id INT,
    title STRING,
    video BINARY
) TBLPROPERTIES (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true',
    'blob-field' = 'video',
    'blob-descriptor-field' = 'video'
);

-- 引用 OSS 上已有的视频文件
INSERT INTO my_db.video_table VALUES
    (1, 'demo.mp4',  sys.path_to_descriptor('oss://my-bucket/videos/demo.mp4')),
    (2, 'intro.mp4', sys.path_to_descriptor('oss://my-bucket/videos/intro.mp4'));

描述符模式:关联已有 Blob 表

创建标注结果表,通过描述符引用原始图片,不重复存储:

CREATE TABLE my_db.annotation_table (
    annotation_id INT,
    image_id INT,
    label STRING,
    confidence DOUBLE,
    image BINARY
) TBLPROPERTIES (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true',
    'blob-field' = 'image',
    'blob-descriptor-field' = 'image'
);

从原始表读取描述符,写入关联表:

INSERT INTO my_db.annotation_table
SELECT
    row_number() OVER (ORDER BY t.id) as annotation_id,
    t.id as image_id,
    'cat' as label,
    0.95 as confidence,
    t.image  -- 写入的是 BlobDescriptor,指向原始表的 .blob 文件
FROM my_db.image_table /*+ OPTIONS('blob-as-descriptor'='true') */ t;

通过实时计算 Flink 使用

关于实时计算 Flink 对接 DLF 的基础配置,请参见实时计算 Flink 版访问 DLF

说明 为保持功能更新到最新状态,需使用自定义 JAR 包,在数据管理中自定义 Catalog。paimon-ali-vvr-11-vvp-multimodal.jar

添加Catalog

自定义Catalogtype类型默认为paimon-multimodal-test,自定义type时,不可设置为paimon,会与原本的Connector冲突。
CREATE CATALOG my_catalog WITH (
  'type' = 'paimon-multimodal-test',
  'metastore' = 'rest',
  'token.provider' = 'dlf',
  'uri' = 'http://cn-hangzhou-vpc.dlf.aliyuncs.com',
  'warehouse' = 'dlf_test'
);

建表与写入

Flink SQL中使用BYTES类型声明Blob列,通过blob-field选项指定为Blob存储。

-- 创建包含Blob列的图片表(默认Blob存储模式)
CREATE TABLE my_catalog.my_db.image_table (
    id INT,
    name STRING,
    category STRING,
    image BYTES
) WITH (
    'row-tracking.enabled' = 'true',
    'data-evolution.enabled' = 'true',
    'blob-field' = 'image',
    'blob-as-descriptor' = 'true'
);

-- 写入数据
INSERT INTO my_catalog.my_db.image_table VALUES
    (1, 'cat.jpg',  'photo', my_catalog.sys.path_to_descriptor('oss://my-bucket/images/cat.jpg')),
    (2, 'dog.jpg',  'photo', my_catalog.sys.path_to_descriptor('oss://my-bucket/images/dog.jpg'));

查询数据

-- 仅读取结构化列,不加载 Blob 数据
SELECT id, name, category FROM my_catalog.my_db.image_table;

-- 读取Blob内容,获取文件大小
SELECT id, name, descriptor_to_string(image) as image_desc FROM my_catalog.my_db.image_table;

-- 以描述符形式读取(获取BlobDescriptor而非实际内容)
SELECT id, name, length(image) as img_size
FROM my_catalog.my_db.image_table /*+ OPTIONS('blob-as-descriptor'='false') */;

也可以通过 INSERT INTO ... SELECT 从其他已有表导入数据。

通过 PyPaimon 使用

PyPaimon 提供完整的 Blob 读写支持,适合 AI 推理管线中对图片和音视频的处理。

建表与写入 Blob 数据

import pyarrow as pa
from pypaimon import CatalogFactory, Schema

# 连接 DLF Catalog
catalog = CatalogFactory.create({
    'metastore': 'rest',
    'uri': 'https://${regionId}-vpc.dlf.aliyuncs.com',
    'warehouse': 'my_catalog',
    'token.provider': 'dlf',
    'dlf.access-key-id': '<AK>',
    'dlf.access-key-secret': '<SK>',
})

pa_schema = pa.schema([
    ('id', pa.int64()),
    ('name', pa.string()),
    ('picture', pa.large_binary())
])

schema = Schema.from_pyarrow_schema(
    pa_schema,
    options={
        'row-tracking.enabled': 'true',
        'data-evolution.enabled': 'true',
        'blob-field': 'picture',
    }
)
catalog.create_table('my_db.image_table', schema, True)
table = catalog.get_table('my_db.image_table')

# 写入 Blob 数据
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

image_bytes = b'\x89PNG\r\n\x1a\n' + b'\x00' * 100

data = pa.Table.from_pydict({
    'id': [1],
    'name': ['sample.png'],
    'picture': [image_bytes],
}, schema=pa_schema)

table_write.write_arrow(data)
table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

从 OSS 读取图片写入:

from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options

oss_io = FileIO.get('oss://my-bucket/', Options({
    'fs.oss.accessKeyId': '<YOUR-AK>',
    'fs.oss.accessKeySecret': '<YOUR-SK>',
    'fs.oss.endpoint': 'https://oss-cn-hangzhou.aliyuncs.com',
}))
with oss_io.new_input_stream('oss://my-bucket/path/test.jpg') as f:
    image_bytes = f.read()

读取 Blob 数据

read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)

# 获取图片二进制数据
for i in range(result.num_rows):
    pic_bytes = result.column('picture')[i].as_py()
    with open(f'output_{i}.png', 'wb') as f:
        f.write(pic_bytes)

列裁剪:仅读取结构化列

read_builder = table.new_read_builder().with_projection(['id', 'name'])
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_pandas(splits)

描述符模式:关联已有 Blob 表

from pypaimon.table.row.blob import BlobDescriptor

# 建表:video 列以描述符方式存储
pa_schema = pa.schema([
    ('id', pa.int32()),
    ('title', pa.string()),
    ('video', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
    pa_schema,
    options={
        'row-tracking.enabled': 'true',
        'data-evolution.enabled': 'true',
        'blob-field': 'video',
        'blob-descriptor-field': 'video',
    }
)
catalog.create_table('my_db.video_table', schema, False)
table = catalog.get_table('my_db.video_table')

# 构造 BlobDescriptor(指向已存在的 OSS 文件)
video_path = 'oss://my-bucket/videos/demo.mp4'
video_size = 1024000  # 文件大小(字节),需提前获取
descriptor = BlobDescriptor(video_path, 0, video_size)

# 写入描述符
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

table_write.write_arrow(pa.Table.from_pydict({
    'id': [1],
    'title': ['demo video'],
    'video': [descriptor.serialize()],
}, schema=pa_schema))

table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()

读取描述符并还原数据:

from pypaimon.table.row.blob import BlobDescriptor, Blob
from pypaimon.common.uri_reader import FileUriReader

read_builder = table.new_read_builder()
splits = read_builder.new_scan().plan().splits()
result = read_builder.new_read().to_arrow(splits)

for i in range(result.num_rows):
    video_bytes = result.column('video')[i].as_py()
    descriptor = BlobDescriptor.deserialize(video_bytes)
    print(f"URI: {descriptor.uri}, offset: {descriptor.offset}, length: {descriptor.length}")

    # 还原为实际数据
    uri_reader = FileUriReader(table.file_io)
    blob = Blob.from_descriptor(uri_reader, descriptor)
    data = blob.to_data()
    print(f"Blob data size: {len(data)} bytes")

存储布局

定义包含 Blob 列的表后,DLF Paimon 自动分离存储:

table/
├── bucket-0/
│   ├── data-uuid-0.parquet      # 结构化列(id, name 等)
│   ├── data-uuid-1.blob         # Blob 数据(picture 列)
│   ├── data-uuid-2.blob         # 更多 Blob 数据
│   └── ...
├── manifest/
├── schema/
└── snapshot/

使用限制

  • Blob 类型仅适用于 Append 表(无主键表),不支持主键表。

  • Blob 列不支持谓词下推,不能在 WHERE 条件中过滤 Blob 列。

  • Blob 列不支持统计信息收集。

  • 必须同时开启 row-tracking.enableddata-evolution.enabled

  • 可根据 Blob 数据大小合理设置 blob.target-file-size