OSS在内网环境中如何实现百Gbps带宽

目前,OSS在上海、深圳、北京和杭州地区已全面支持单账号内网带宽最高可达100 Gbps。本文将深入探讨如何充分发挥OSS的百Gbps带宽性能,包括关键要点分析,以及通过Go语言实现文件下载并测试峰值带宽的实践案例。最后,还将介绍如何利用常用工具通过并发下载提升OSS的下载性能。

适用场景

在对象存储的应用场景中,超过100 Gbps带宽的需求通常出现在需要处理海量数据、支持高并发访问或对实时性要求极高的特殊业务场景中。例如

  • 大数据分析与计算:数据规模大,单次任务可能需要读取TB级甚至PB级的数据,且实时性要求高,为提高计算效率,数据读取速度必须足够快,避免成为瓶颈。

  • AI训练数据集加载:数据规模大,训练数据集可能达到PB级别,有高吞吐量需求,多个训练节点同时加载数据,总带宽需求可能达到百Gbps。

  • 备份与恢复:数据规模大,备份和恢复操作涉及PB级数据,例如全量备份,若规定在有限时间内完成备份和恢复操作,为满足时间要求,备份和恢复的带宽需求可能达到百Gbps。

  • 高性能科学计算:数据规模大,科学计算任务通常需要处理PB级甚至更大的数据集,多个研究团队可能同时访问同一份数据,从而产生高并发访问和高带宽需求。为了支持实时分析和团队协作,确保数据传输速度至关重要。

要点分析

要实现百Gbps宽带,首先需要选择合适ECS实例规格确保带宽上限,其次,若涉及落盘操作,应选用高性能磁盘以提升效率。此外,访问数据时建议通过VPC内网,以提高访问速度,并在需要并发下载时,采用相应的优化技巧,以充分释放下载带宽的潜力。

网络接收能力

OSS的客户端安装于ECS上时,数据下载速度会受ECS网络速度的制约。在阿里云平台,当前网络能力最强的机型已能提供160 Gbps的带宽。采用多机集群模式时,在客户侧进行并发访问的情况下,其总带宽能够达到100 Gbps;而在单机情形下,则优先选用大带宽的网络增强型或者高主频型(高主频在接收大量数据包时具备速度优势)。

说明

需注意基于目前ECS的规格,单个弹性网卡最大支持100 Gbps的带宽。根据ECS的规则,单机带宽超过100 Gbps时需要挂载多个弹性网卡,具体操作请参见弹性网卡

image.png

磁盘IO

数据下载至本地后,若执行数据落盘操作,下载速度便会受磁盘性能所限。例如ossutil、ossfs等工具在默认设置下均存在数据落盘行为,此时可考虑选用更高性能的磁盘或者内存盘来改善状况,如图所示。

说明

尽管阿里云ESSD云盘已可实现32 Gbps的单盘吞吐量,然而与内存相比,仍存在显著差距。为实现最大下载带宽,应尽量避免数据落盘操作。有关ossfs如何避免数据落盘,提高读取性能,请参见只读场景性能调优

image

使用VPC网络

阿里云内网针对网络数据请求进行了优化,在使用VPC内网域名时,能够获取比公网更稳定的网络服务。若要达到最大带宽,需采用VPC内网网络服务。

客户的ECS运行于客户的VPC网络之中,OSS提供了可供任意客户VPC访问的统一内网域名(例如:oss-cn-beijing-internal.aliyuncs.com)。ECSOSS之间的数据流会经过SLB负载均衡,请求被发送至后端的分布式集群里,从而将数据请求的压力均匀分散到整个集群,使OSS具备了强大的高并发处理能力。

image

并发下载

OSS采用HTTP协议传输数据,鉴于单个HTTP请求的性能存在局限,对OSS实施数据下载加速的一项基本策略为并发下载。即把一个文件分割成多个range,让每个请求仅访问其中一个range,通过这种方式来获取最大的下载带宽。由于OSS依据API调用次数计费,所以并非将文件切分得越细小越好。range越小,API调用次数就会越多,并且更小的range其单流下载速度也无法达到峰值。有关常用工具的并发下载技巧,请参见常用工具并发下载技巧

image

实践案例

为了测试最大带宽的下载能力,我们专门构建了一个Go语言测试程序,用于从OSS下载一个100 GB大小的二进制bin文件。在该程序的设计中,我们采用了特殊的数据处理策略,数据不进行落盘操作,即数据读取一遍后便直接丢弃。同时,将大文件的读取过程拆分成多个range,并且把range的大小以及并发数据量设置为可调节的参数,这样就能方便地通过调整这些参数来实现最大带宽的下载效果。具体的实验过程如下。

实验环境

实例规格

vCPU

内存(GiB)

网络带宽基础/突发(Gbit/s)

网络收发包PPS

连接数

多队列

弹性网卡

单网卡私有IPv4/IPv6地址数

可挂载的云盘数

云盘IOPS基础/突发

云盘带宽基础/突发(Gbit/s)

ecs.hfg8i.32xlarge

128

512

100/无

3000 万

400 万

64

15

50/50

64

90万/无

64/无

测试步骤

  1. 配置环境变量。

    export OSS_ACCESS_KEY_ID=<ALIBABA_CLOUD_ACCESS_KEY_ID>
    export OSS_ACCESS_KEY_SECRET=<ALIBABA_CLOUD_ACCESS_KEY_SECRET>
  2. 代码示例。

    package main
    
    import (
        "context"
        "flag"
        "fmt"
        "io"
        "log"
        "time"
        "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
        "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
    )
    
    // 定义全局变量,用于存储命令行参数
    var (
        region      string // OSS 存储桶所在的区域
        endpoint    string // OSS 服务的访问域名
        bucketName  string // OSS 存储桶的名称
        objectName  string // OSS 对象的名称
        chunkSize   int64  // 分块大小(字节)
        prefetchNum int    // 预取数量
    )
    
    // 初始化函数,用于解析命令行参数
    func init() {
        flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
        flag.StringVar(&endpoint, "endpoint", "", "The domain names that other services can use to access OSS.")
        flag.StringVar(&bucketName, "bucket", "", "The `name` of the bucket.")
        flag.StringVar(&objectName, "object", "", "The `name` of the object.")
        flag.Int64Var(&chunkSize, "chunk-size", 0, "The chunk size, in bytes")
        flag.IntVar(&prefetchNum, "prefetch-num", 0, "The prefetch number")
    }
    
    func main() {
        // 解析命令行参数
        flag.Parse()
    
        // 检查必要的参数是否提供
        if len(bucketName) == 0 {
            flag.PrintDefaults()
            log.Fatalf("invalid parameters, bucket name required")
        }
    
        if len(region) == 0 {
            flag.PrintDefaults()
            log.Fatalf("invalid parameters, region required")
        }
    
        // 配置 OSS 客户端
        cfg := oss.LoadDefaultConfig().
            WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()). // 使用环境变量中的凭证
            WithRegion(region) // 设置区域
    
        // 如果提供了自定义 endpoint,则设置
        if len(endpoint) > 0 {
            cfg.WithEndpoint(endpoint)
        }
    
        // 创建 OSS 客户端
        client := oss.NewClient(cfg)
    
        // 打开 OSS 文件
        f, err := client.OpenFile(context.TODO(), bucketName, objectName, func(oo *oss.OpenOptions) {
            oo.EnablePrefetch = true      // 启用预取
            oo.ChunkSize = chunkSize      // 设置分块大小
            oo.PrefetchNum = prefetchNum  // 设置预取数量
            oo.PrefetchThreshold = int64(0) // 设置预取阈值
        })
    
        if err != nil {
            log.Fatalf("open fail, err:%v", err)
        }
    
        // 记录开始时间
        startTick := time.Now().UnixNano() / 1000 / 1000
    
        // 将文件内容读取并丢弃(用于测试读取速度)
        written, err := io.Copy(io.Discard, f)
    
        // 记录结束时间
        endTick := time.Now().UnixNano() / 1000 / 1000
    
        if err != nil {
            log.Fatalf("copy fail, err:%v", err)
        }
    
        // 计算读取速度(MiB/s)
        speed := float64(written/1024/1024) / float64((endTick-startTick)/1000)
    
        // 输出平均读取速度
        fmt.Printf("average speed:%.2f(MiB/s)\n", speed)
    }
  3. 启动测试程序。

    go run down_object.go -bucket yourbucket -endpoint oss-cn-hangzhou-internal.aliyuncs.com  -object 100GB.file -region cn-hangzhou -chunk-size 419430400 -prefetch-num 256

测试结论

在测试实际下载耗时的过程中,我们通过调整并发度和chunk大小来观察数据变化。一般来说,并发度设置为core1-4倍较为合适,chunk大小则尽量与FileSize/Concurrency的值保持一致,且不能小于2 MB。按照这样的参数设置,最终获得了最短的下载耗时,测试中的峰值速度达到了100 Gbps。

序号

Concurrency(并发数)

blcokSize(块大小:MB)

峰值带宽(Gbps)

e2e(端到端时间:s)

1

128

800

100

16.321

2

256

400

100

14.881

3

512

200

100

15.349

4

1024

100

100

19.129

常用工具并发下载技巧

以下为您介绍常用工具在优化对象存储(OSS)下载性能方面的具体方法与应用策略。

ossutil

  • 参数说明

    选项名称

    描述

    --bigfile-threshold

    开启大文件断点续传的文件大小阈值,单位为Byte,默认值为100 MByte,取值范围为0~9223372036854775807。

    --range

    下载文件时,可以指定文件内容的字节范围进行下载,字节从0开始编号。

    • 可以指定一个区间,例如3-9表示从第3个字节到第9个字节(包含第3和第9字节)。

    • 可以指定从什么字段开始,例如3-表示从第3个字节开始到文件结尾(包含第3个字节)。

    • 可以指定从什么字段结束,例如-9表示从0字节到第9个字节(包含第9个字节)。

    --parallel

    单文件内部操作的并发任务数,取值范围为1~10000,默认由ossutil根据操作类型和文件大小自行决定。

    --part-size

    分片大小以字节为单位,合适的取值范围是2097152-16777216 B,也就是 2-16 MB。一般来说,当CPU核心数量较多时,可将分片大小设置得小一些;而当CPU核心数量较少时,则可适当调高分片大小。

  • 参考示例

    以下命令用于采用256并发处理以及468435456字节的分块策略,将目标Bucket100GB.file文件复制到本地/dev/shm目录,并统计耗时。

    time ossutil --parallel=256 --part-size=468435456 --endpoint=oss-cn-hangzhou-internal.aliyuncs.com cp oss://cache-test/EcsTest/100GB.file /dev/shm/100GB.file
  • 性能说明

    端到端平均速度为2.94GB/s(约24 Gbps)。

    image.png

ossfs

  • 参数说明

    选项名称

    描述

    parallel_count

    以分片模式上传大文件时,分片的并发数,默认值为5。

    multipart_size

    以分片模式上传数据时分片的大小,单位是MB,默认值为10。该参数会影响最大支持的文件大小。分片模式上传时,最多的分片数为10000,默认值下,最大支持的文件为100 GB。如果需要支持更大的文件,需要根据需求调整这个值。

    direct_read

    临时数据是否落盘。默认情况下该开关为关,临时数据会存储在磁盘。

    direct_read_prefetch_chunks

    direct_read打开时,预取的chunk数量。

    direct_read_chunk_size

    direct_read打开时,一个chunk大小。

    ensure_diskfree

    用于设置ossfs保留的可用磁盘空间大小。为提升性能,默认情况下ossfs会使用磁盘空间来保存上传或下载的临时数据。您可以通过该选项设置保留的可用硬盘空间大小,单位为MB。例如,您需要设置ossfs保留1024 MB的可用磁盘空间,则使用-oensure_diskfree=1024。

    max_stat_cache_size

    指定文件元数据的缓存数量,单位为个,默认值为10W。

    stat_cache_expire

    指定文件元数据缓存的失效时间,单位为秒,默认不失效。

  • 参考示例

    • 直读模式:挂载名为cache-testbucket到本地/mnt/cache-test文件夹下,并设置分片并发数为128,每个分片大小32 MB。

      ossfs cache-test /mnt/cache-test -ourl=http://oss-cn-hangzhou-internal.aliyuncs.com  -oparallel_count=128 -omultipart_size=32 
    • 落盘模式:挂载名为cache-testbucket到本地/mnt/cache-test文件夹下,同时开启落盘模式,设置预取chunk数量为128,每个chunk大小32 MB。

      ossfs cache-test /mnt/cache-test -ourl=http://oss-cn-hangzhou-internal.aliyuncs.com  -odirect_read -odirect_read_prefetch_chunks=128 -odirect_read_chunk_size=32
  • 性能说明

    默认模式中数据会进行落盘。直读模式数据保存在内存中,访问效果更高,但是内存开销更大。

    说明

    在直读模式下,ossfschunk为单位管理下载数据,默认chunk大小为4 MB,可通过direct_read_chunk_size参数自行设置。ossfs会在内存中保留 [当前 chunk - 1, 当前 chunk + direct_read_prefetch_chunks] 区间内的数据。判断是否适合采用直读模式,可依据内存大小,尤其是pagecache空间来确定。一般来说,当pagecache不够大时更适合直读模式。比如,机器总内存为16 GB,pagecache可以使用6 GB,那么直读模式适合6 GB以上的文件。有关直读模式详细介绍,请参见直读模式

    模式

    Concurrency(并发数)

    blcokSize(块大小:MB)

    峰值带宽(Gbps)

    e2e 带宽(Gbps)

    e2e 耗时(s)

    默认

    128

    32

    24

    11.3

    72.01

    直读

    128

    32

    24

    16.1

    50.9

    针对模型读取的优化:

    模型大小(GB)

    默认模式(耗时:s,限制内存6 GB)

    混合直读模式(耗时:s)

    混合直读模式(耗时:s,调整chunk保留窗口[-32, +32])

    1

    8.19

    8.20

    8.56

    2.4

    24.5

    20.43

    20.02

    5

    26.5

    22.3

    19.89

    5.5

    22.8

    23.1

    22.98

    8.5

    106.0

    36.6

    36.00

    12.6

    154.6

    42.1

    41.9

Python SDK

AI模型训练场景里,Python SDK默认是以串行方式访问后端存储服务的。 通过Python的并发库进行多线程改造,带宽获得了极为显著的提升。

  • 实验场景

    测试选用的模型文件大小约为5.6 GB,测试机器的规格为ECS48vCPU、具备16 Gbps带宽以及180 GB内存。

  • 测试示例

    import oss2
    import time
    import os
    import threading
    from io import BytesIO
    
    # 配置参数(通过环境变量获取)
    OSS_CONFIG = {
        "bucket_endpoint": os.environ.get('OSS_BUCKET_ENDPOINT', 'oss-cn-hangzhou-internal.aliyuncs.com'),  # 默认Endpoint示例
        "bucket_name": os.environ.get('OSS_BUCKET_NAME', 'bucket_name'),  #Bucket 名称
        "access_key_id": os.environ['ACCESS_KEY_ID'],         # RAM用户ACCESS_KEY_ID
        "access_key_secret": os.environ['ACCESS_KEY_SECRET']  # RAM用户ACCESS_KEY_SECRET
    }
    
    # 初始化 OSS Bucket 对象
    def __bucket__():
        auth = oss2.Auth(OSS_CONFIG["access_key_id"], OSS_CONFIG["access_key_secret"])
        return oss2.Bucket(
            auth, 
            OSS_CONFIG["bucket_endpoint"], 
            OSS_CONFIG["bucket_name"], 
            enable_crc=False
        )
    
    # 获取对象大小
    def __get_object_size(object_name):
        simplifiedmeta = __bucket__().get_object_meta(object_name)
        return int(simplifiedmeta.headers['Content-Length'])
    
    # 获取远程模型最后修改时间
    def get_remote_model_mmtime(model_name):
        return __bucket__().head_object(model_name).last_modified
    
    # 列出远程模型文件
    def list_remote_models(ext_filter=('.ckpt',)):  # 添加默认扩展名过滤
        dir_prefix = ""
        output = []
        
        for obj in oss2.ObjectIteratorV2(
            __bucket__(),
            prefix=dir_prefix,
            delimiter='/',
            start_after=dir_prefix,
            fetch_owner=False
        ):
            if not obj.is_prefix():
                _, ext = os.path.splitext(obj.key)
                if ext.lower() in ext_filter:
                    output.append(obj.key)
        return output
    
    # 分段下载线程函数
    def __range_get(object_name, buffer, offset, start, end, read_chunk_size, progress_callback, total_bytes):
        chunk_size = int(read_chunk_size)
        with __bucket__().get_object(object_name, byte_range=(start, end)) as object_stream:
            s = start
            while True:
                chunk = object_stream.read(chunk_size)
                if not chunk:
                    break
                buffer.seek(s - offset)
                buffer.write(chunk)
                s += len(chunk)
                # 计算已下载字节数并调用进度回调
                if progress_callback:
                    progress_callback(s - start, total_bytes)
    
    # 读取远程模型(增加进度回调可选参数)
    def read_remote_model(
        checkpoint_file, 
        start=0, 
        size=-1, 
        read_chunk_size=2*1024*1024,  # 2MB
        part_size=256*1024*1024,      # 256MB
        progress_callback=None        # 进度回调
    ):
        time_start = time.time()
        buffer = BytesIO()
        obj_size = __get_object_size(checkpoint_file)
        
        end = (obj_size if size == -1 else start + size) - 1
        s = start
        tasks = []
    
        # 进度计算
        total_bytes = end - start + 1
        downloaded_bytes = 0
    
        while s <= end:
            current_end = min(s + part_size - 1, end)
            task = threading.Thread(
                target=__range_get,
                args=(checkpoint_file, buffer, start, s, current_end, read_chunk_size, progress_callback, total_bytes)
            )
            tasks.append(task)
            task.start()
            s += part_size
    
        for task in tasks:
            task.join()
    
        time_end = time.time()
        # 显示总耗时
        print(f"Downloaded {checkpoint_file} in {time_end - time_start:.2f} seconds.")
    
         # 计算并打印下载文件的大小(单位:GB)
        file_size_gb = obj_size / (1024 * 1024 * 1024)
        print(f"Total downloaded file size: {file_size_gb:.2f} GB")
    
        buffer.seek(0)
        return buffer
    
    # 进度回调函数
    def show_progress(downloaded, total):
        progress = (downloaded / total) * 100
        print(f"Progress: {progress:.2f}%", end="\r")
    
    # 调用示例
    if __name__ == "__main__":
        # 调用 list_remote_models 方法列出远程模型文件
        models = list_remote_models()
        print("Remote models:", models)
    
        if models:
            # 选择第一个模型文件进行下载
            first_model = models[0]
            buffer = read_remote_model(first_model, progress_callback=show_progress)
            print(f"\nDownloaded {first_model} to buffer.")
  • 实验结论

    版本

    OSS耗时 /s

    OSS平均带宽 MB/s

    OSS峰值带宽MB/s

    OSS pythonSDK

    109

    53

    100

    OSS pythonSDK(并发下载)

    11.1

    516

    600

    从实验结果数据能够明显看出,相较于串行模式下的OSS Python SDK,并发下载模式下的耗时仅为前者的约0.2%,平均带宽是前者的约9.7倍,峰值带宽达到了前者的6倍。由此可见,在使用Python SDK进行AI模型训练时,采用并发下载的方式能够极大地提高处理效率,并大幅提升带宽性能。

  • 其他方案

    除了单独直接调用 SDK 这种方式,阿里云还专门提供了一个名为osstorchconnectorPython库,它主要用于在PyTorch训练任务中高效地访问和存储OSS数据。该库已经为用户完成了并发的二次封装,用户可以直接使用。下面是有关使用osstorchconnector进行AI模型加载的测试结果。更多有关osstorchconnector的性能测试内容,请参见性能测试

    项目

    详细信息

    实验场景

    模型加载与聊天问答

    模型名称

    gpt3-finnish-3B

    模型大小

    11 GB

    应用场景

    聊天问答

    硬件配置

    高规格ECS:96核(vCPU)、384 GiB内存、30 Gbps内网带宽

    实验结论

    平均带宽约为10 Gbps,OSS服务端可支持10个任务同时加载模型