使用OSS Connector for AI/ML结合OSS加速器加速小文件读取

更新时间:
复制为 MD 格式

AI/ML训练中,百万级小文件(如ImageNet图片)的随机I/OGPU利用率的主要瓶颈。OSS Connector for AI/ML结合OSS加速器,可大幅提升数据加载速度。

加速原理

OSS Connector for AI/MLOSS加速器分别在应用层和存储层进行优化,两者结合产生协同加速效果:

  • OSS Connector for AI/ML(应用层):内置异步I/O模型和多线程预取机制,将串行的文件请求转化为高并发的并行流,在后台持续预取下一个Batch所需的数据,消除CPU/GPU等待I/O的空闲时间。

  • OSS加速器(存储层):将热点数据缓存至高性能存储介质,采用冷热缓存机制工作。首次读取数据时(冷缓存),需要从OSS源站加载到缓存,性能与直接访问OSS相当;后续读取相同数据时(热缓存),数据直接从缓存返回,相比OSS源站的P50延迟可降低约2.8倍(约12 ms vs 35 ms)。在模型训练中,由于多个Epoch会重复读取相同数据集,从第二个Epoch开始即可享受加速。

  • 协同效果:Connector发起的海量并发请求被加速器以毫秒级速度消化,不再受限于OSS源站延迟。训练启动或Epoch切换时的突发流量被加速器作为缓冲池吸收,确保稳定的高吞吐输出。

前提条件

  • 已创建OSS Bucket并上传训练数据集。

  • 已获取AccessKey IDAccessKey Secret。具体操作,请参见创建AccessKey

  • 已创建ECS实例。推荐计算增强型或网络增强型实例(如ecs.g7.32xlarge),操作系统推荐Alibaba Cloud Linux 3/4。具体操作,请参见自定义购买实例

Bucket开通OSS加速器

  1. 登录OSS管理控制台,单击目标Bucket名称。

  2. 在左侧导航栏,选择Bucket配置 > OSS加速器

  3. 单击创建OSS加速器,配置以下关键参数:

    配置项

    说明

    可用区

    选择与ECS实例相同的可用区。本文示例为华北2(北京)可用区H

    容量

    需大于等于数据集总大小。本文示例为20 TB

    加速策略

    选择指定路径加速并填写数据集前缀,或选择加速整个Bucket

    重要

    ECS实例与OSS加速器建议位于同一可用区。跨可用区访问会引入额外网络延迟,影响加速效果。

  4. 创建完成后,在OSS Connector for AI/ML中使用时,Endpoint格式为oss-cache-<可用区>.aliyuncs.com。例如,可用区为cn-beijing-h时,对应Endpointoss-cache-cn-beijing-h.aliyuncs.com

安装和配置OSS Connector for AI/ML

  1. 登录ECS实例,安装OSS Connector for AI/ML(PyTorch版)。

    pip install osstorchconnector
  2. 配置访问凭证。将<yourAccessKeyId><yourAccessKeySecret>替换为您的AccessKey信息。

    mkdir -p /root/.alibabacloud
    cat > /root/.alibabacloud/credentials << 'EOF'
    {
        "AccessKeyId": "<yourAccessKeyId>",
        "AccessKeySecret": "<yourAccessKeySecret>"
    }
    EOF

    凭证文件必须使用JSON格式。更多配置方式,请参见配置OSS Connector for AI/ML

  3. 创建OSS Connector配置文件,调整预取参数以优化小文件读取性能。

    mkdir -p /etc/oss-connector/
    cat > /etc/oss-connector/config.json << 'EOF'
    {
        "logLevel": 1,
        "logPath": "/var/log/oss-connector/connector.log",
        "auditPath": "/var/log/oss-connector/audit.log",
        "datasetConfig": {
            "prefetchMB": 1024,
            "prefetchConcurrency": 16,
            "prefetchWorker": 2,
            "prefetchUnitMB": 1,
            "timeoutMs": 10000
        },
        "checkpointConfig": {
            "prefetchConcurrency": 24,
            "prefetchWorker": 4,
            "uploadConcurrency": 64
        }
    }
    EOF

    关键参数说明:

    参数

    推荐值

    说明

    prefetchMB

    1024

    预取缓冲区大小(MB)。1 GB可缓存约8,700115 KB小文件。文件较大时可适当增大。

    prefetchConcurrency

    16

    预取并发数。充分利用高带宽实例的网络能力。

    prefetchUnitMB

    1

    单次预取单元大小(MB)。小文件场景设为1 MB,大文件应增大以匹配文件大小。

执行性能对比测试

创建测试脚本test_accelerator.py,分别使用OSS内网EndpointOSS加速器Endpoint读取数据集,对比性能差异。

from osstorchconnector import OssMapDataset
import torch
from torch.utils.data import DataLoader
import time
import numpy as np

# === 配置 ===
# 第一轮使用OSS内网Endpoint,第二轮切换为OSS加速器Endpoint
ENDPOINT = "http://oss-cn-beijing-internal.aliyuncs.com"
# ENDPOINT = "http://oss-cache-cn-beijing-h.aliyuncs.com"

CONFIG_PATH = "/etc/oss-connector/config.json"
CRED_PATH = "/root/.alibabacloud/credentials"
OSS_URI = "oss://<yourBucketName>/<yourDatasetPrefix>/"
REGION = "cn-beijing"

def collate_fn(batch):
    results = []
    for item in batch:
        start = time.perf_counter()
        content = item.read()
        read_time = time.perf_counter() - start
        results.append({'size': item.size, 'read_time': read_time})
    return results

dataset = OssMapDataset.from_prefix(
    OSS_URI,
    endpoint=ENDPOINT,
    cred_path=CRED_PATH,
    config_path=CONFIG_PATH,
    region=REGION
)

NUM_WORKERS = 8
BATCH_SIZE = 256
dataloader = DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    collate_fn=collate_fn,
    pin_memory=False,
    shuffle=True
)

# === 执行测试 ===
all_batch_durations = []
total_size = 0
file_count = 0
start_time = time.perf_counter()
last_receive_time = start_time

print(f"开始测试: {NUM_WORKERS} workers, batch_size={BATCH_SIZE}")
print(f"Endpoint: {ENDPOINT}\n")

try:
    for batch in dataloader:
        current_receive_time = time.perf_counter()
        batch_duration = current_receive_time - last_receive_time
        all_batch_durations.append(batch_duration)
        last_receive_time = current_receive_time

        batch_total_size = sum(data['size'] for data in batch)
        total_size += batch_total_size
        file_count += len(batch)

        if file_count % 10000 == 0:
            elapsed = current_receive_time - start_time
            throughput = total_size / elapsed / (1024 ** 2)
            print(f"  累计 {file_count:,} 文件 | 吞吐: {throughput:.2f} MB/s")

except KeyboardInterrupt:
    print("\n测试中断")

# === 输出结果 ===
end_time = time.perf_counter()
total_elapsed = end_time - start_time
num_batches = len(all_batch_durations)

if num_batches > 0:
    durations_ms = np.array(all_batch_durations) * 1000
    print(f"\n{'='*50}")
    print(f"测试结果")
    print(f"{'='*50}")
    print(f"总文件数:     {file_count:,}")
    print(f"总数据量:     {total_size / (1024**2):,.2f} MB")
    print(f"总耗时:       {total_elapsed:.2f} 秒")
    print(f"平均吞吐:     {total_size / total_elapsed / (1024**2):,.2f} MB/s")
    print(f"\nBatch延迟 (ms): Avg={np.mean(durations_ms):.2f}  P50={np.percentile(durations_ms, 50):.2f}  P95={np.percentile(durations_ms, 95):.2f}")
else:
    print("未接收到任何batch")

将脚本中的OSS_URIREGION替换为您的实际信息后,按以下步骤执行测试:

1. 使用OSS内网Endpoint运行(基线)

确认脚本中ENDPOINTOSS内网地址,执行测试并记录总耗时:

python test_accelerator.py

2. 切换为OSS加速器Endpoint运行

ENDPOINT改为OSS加速器地址:

ENDPOINT = "http://oss-cache-cn-beijing-h.aliyuncs.com"

加速器首次读取数据时需要从OSS源站加载到缓存(冷缓存),性能与直接访问OSS相当。需要执行两次:第一次预热缓存,第二次才能体现加速效果。

# 第一次:预热缓存
python test_accelerator.py

# 第二次:命中缓存,体现加速效果
python test_accelerator.py

对比基线与第二次执行的总耗时,即可看到加速器对小文件场景的加速效果。

性能对比结果

以下为ImageNet数据集(128万文件,共137 GB,平均文件大小约115 KB)在ecs.g7.32xlarge实例上的实测结果:

指标

无加速器

有加速器

总耗时

246.89

101.49

平均吞吐

567.44 MB/s

1,380.41 MB/s

速度提升

2.44

实际性能受实例规格、数据集规模和文件大小等因素影响,建议以您的实际测试结果为准。