在AI/ML训练中,百万级小文件(如ImageNet图片)的随机I/O是GPU利用率的主要瓶颈。OSS Connector for AI/ML结合OSS加速器,可大幅提升数据加载速度。
加速原理
OSS Connector for AI/ML与OSS加速器分别在应用层和存储层进行优化,两者结合产生协同加速效果:
OSS Connector for AI/ML(应用层):内置异步I/O模型和多线程预取机制,将串行的文件请求转化为高并发的并行流,在后台持续预取下一个Batch所需的数据,消除CPU/GPU等待I/O的空闲时间。
OSS加速器(存储层):将热点数据缓存至高性能存储介质,采用冷热缓存机制工作。首次读取数据时(冷缓存),需要从OSS源站加载到缓存,性能与直接访问OSS相当;后续读取相同数据时(热缓存),数据直接从缓存返回,相比OSS源站的P50延迟可降低约2.8倍(约12 ms vs 35 ms)。在模型训练中,由于多个Epoch会重复读取相同数据集,从第二个Epoch开始即可享受加速。
协同效果:Connector发起的海量并发请求被加速器以毫秒级速度消化,不再受限于OSS源站延迟。训练启动或Epoch切换时的突发流量被加速器作为缓冲池吸收,确保稳定的高吞吐输出。
前提条件
已创建OSS Bucket并上传训练数据集。
已获取AccessKey ID和AccessKey Secret。具体操作,请参见创建AccessKey。
已创建ECS实例。推荐计算增强型或网络增强型实例(如ecs.g7.32xlarge),操作系统推荐Alibaba Cloud Linux 3/4。具体操作,请参见自定义购买实例。
为Bucket开通OSS加速器
登录OSS管理控制台,单击目标Bucket名称。
在左侧导航栏,选择Bucket配置 > OSS加速器。
单击创建OSS加速器,配置以下关键参数:
配置项
说明
可用区
选择与ECS实例相同的可用区。本文示例为华北2(北京)可用区H 。
容量
需大于等于数据集总大小。本文示例为20 TB。
加速策略
选择指定路径加速并填写数据集前缀,或选择加速整个Bucket。
重要ECS实例与OSS加速器建议位于同一可用区。跨可用区访问会引入额外网络延迟,影响加速效果。
创建完成后,在OSS Connector for AI/ML中使用时,Endpoint格式为
oss-cache-<可用区>.aliyuncs.com。例如,可用区为cn-beijing-h时,对应Endpoint为oss-cache-cn-beijing-h.aliyuncs.com。
安装和配置OSS Connector for AI/ML
登录ECS实例,安装OSS Connector for AI/ML(PyTorch版)。
pip install osstorchconnector配置访问凭证。将
<yourAccessKeyId>和<yourAccessKeySecret>替换为您的AccessKey信息。mkdir -p /root/.alibabacloud cat > /root/.alibabacloud/credentials << 'EOF' { "AccessKeyId": "<yourAccessKeyId>", "AccessKeySecret": "<yourAccessKeySecret>" } EOF凭证文件必须使用JSON格式。更多配置方式,请参见配置OSS Connector for AI/ML。
创建OSS Connector配置文件,调整预取参数以优化小文件读取性能。
mkdir -p /etc/oss-connector/ cat > /etc/oss-connector/config.json << 'EOF' { "logLevel": 1, "logPath": "/var/log/oss-connector/connector.log", "auditPath": "/var/log/oss-connector/audit.log", "datasetConfig": { "prefetchMB": 1024, "prefetchConcurrency": 16, "prefetchWorker": 2, "prefetchUnitMB": 1, "timeoutMs": 10000 }, "checkpointConfig": { "prefetchConcurrency": 24, "prefetchWorker": 4, "uploadConcurrency": 64 } } EOF关键参数说明:
参数
推荐值
说明
prefetchMB1024
预取缓冲区大小(MB)。1 GB可缓存约8,700个115 KB小文件。文件较大时可适当增大。
prefetchConcurrency16
预取并发数。充分利用高带宽实例的网络能力。
prefetchUnitMB1
单次预取单元大小(MB)。小文件场景设为1 MB,大文件应增大以匹配文件大小。
执行性能对比测试
创建测试脚本test_accelerator.py,分别使用OSS内网Endpoint和OSS加速器Endpoint读取数据集,对比性能差异。
from osstorchconnector import OssMapDataset
import torch
from torch.utils.data import DataLoader
import time
import numpy as np
# === 配置 ===
# 第一轮使用OSS内网Endpoint,第二轮切换为OSS加速器Endpoint
ENDPOINT = "http://oss-cn-beijing-internal.aliyuncs.com"
# ENDPOINT = "http://oss-cache-cn-beijing-h.aliyuncs.com"
CONFIG_PATH = "/etc/oss-connector/config.json"
CRED_PATH = "/root/.alibabacloud/credentials"
OSS_URI = "oss://<yourBucketName>/<yourDatasetPrefix>/"
REGION = "cn-beijing"
def collate_fn(batch):
results = []
for item in batch:
start = time.perf_counter()
content = item.read()
read_time = time.perf_counter() - start
results.append({'size': item.size, 'read_time': read_time})
return results
dataset = OssMapDataset.from_prefix(
OSS_URI,
endpoint=ENDPOINT,
cred_path=CRED_PATH,
config_path=CONFIG_PATH,
region=REGION
)
NUM_WORKERS = 8
BATCH_SIZE = 256
dataloader = DataLoader(
dataset,
batch_size=BATCH_SIZE,
num_workers=NUM_WORKERS,
collate_fn=collate_fn,
pin_memory=False,
shuffle=True
)
# === 执行测试 ===
all_batch_durations = []
total_size = 0
file_count = 0
start_time = time.perf_counter()
last_receive_time = start_time
print(f"开始测试: {NUM_WORKERS} workers, batch_size={BATCH_SIZE}")
print(f"Endpoint: {ENDPOINT}\n")
try:
for batch in dataloader:
current_receive_time = time.perf_counter()
batch_duration = current_receive_time - last_receive_time
all_batch_durations.append(batch_duration)
last_receive_time = current_receive_time
batch_total_size = sum(data['size'] for data in batch)
total_size += batch_total_size
file_count += len(batch)
if file_count % 10000 == 0:
elapsed = current_receive_time - start_time
throughput = total_size / elapsed / (1024 ** 2)
print(f" 累计 {file_count:,} 文件 | 吞吐: {throughput:.2f} MB/s")
except KeyboardInterrupt:
print("\n测试中断")
# === 输出结果 ===
end_time = time.perf_counter()
total_elapsed = end_time - start_time
num_batches = len(all_batch_durations)
if num_batches > 0:
durations_ms = np.array(all_batch_durations) * 1000
print(f"\n{'='*50}")
print(f"测试结果")
print(f"{'='*50}")
print(f"总文件数: {file_count:,}")
print(f"总数据量: {total_size / (1024**2):,.2f} MB")
print(f"总耗时: {total_elapsed:.2f} 秒")
print(f"平均吞吐: {total_size / total_elapsed / (1024**2):,.2f} MB/s")
print(f"\nBatch延迟 (ms): Avg={np.mean(durations_ms):.2f} P50={np.percentile(durations_ms, 50):.2f} P95={np.percentile(durations_ms, 95):.2f}")
else:
print("未接收到任何batch")将脚本中的OSS_URI和REGION替换为您的实际信息后,按以下步骤执行测试:
1. 使用OSS内网Endpoint运行(基线)
确认脚本中ENDPOINT为OSS内网地址,执行测试并记录总耗时:
python test_accelerator.py2. 切换为OSS加速器Endpoint运行
将ENDPOINT改为OSS加速器地址:
ENDPOINT = "http://oss-cache-cn-beijing-h.aliyuncs.com"加速器首次读取数据时需要从OSS源站加载到缓存(冷缓存),性能与直接访问OSS相当。需要执行两次:第一次预热缓存,第二次才能体现加速效果。
# 第一次:预热缓存
python test_accelerator.py
# 第二次:命中缓存,体现加速效果
python test_accelerator.py对比基线与第二次执行的总耗时,即可看到加速器对小文件场景的加速效果。
性能对比结果
以下为ImageNet数据集(128万文件,共137 GB,平均文件大小约115 KB)在ecs.g7.32xlarge实例上的实测结果:
指标 | 无加速器 | 有加速器 |
总耗时 | 246.89秒 | 101.49秒 |
平均吞吐 | 567.44 MB/s | 1,380.41 MB/s |
速度提升 | — | 2.44倍 |
实际性能受实例规格、数据集规模和文件大小等因素影响,建议以您的实际测试结果为准。