OSS提供的分片上传(Multipart Upload)功能,将要上传的较大文件(Object)分成多个分片(Part)来分别上传,上传完成后再调用CompleteMultipartUpload接口将这些Part组合成一个Object来达到断点续传的效果。

分片上传流程

分片上传(Multipart Upload)分为以下三个步骤:

  1. 初始化一个分片上传事件。

    调用bucket.init_multipart_upload方法返回OSS创建的全局唯一的uploadId。

  2. 上传分片。

    调用bucket.upload_part方法上传分片数据。

    说明
    • 对于同一个uploadId,分片号(partNumber)标识了该分片在整个文件内的相对位置。如果使用同一个分片号上传了新的数据,那么OSS上这个分片已有的数据将会被覆盖。
    • OSS将收到的分片数据的MD5值放在ETag头内返回给用户。
    • OSS计算上传数据的MD5值,并与SDK计算的MD5值比较,如果不一致则返回InvalidDigest错误码。
  3. 完成分片上传。

    所有分片上传完成后,调用bucket.complete_multipart_upload方法将所有分片合并成完整的文件。

分片上传完整示例

以下通过一个完整的示例对分片上传的流程进行逐步解析:

# -*- coding: utf-8 -*-
import os
from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo
import oss2

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret')
# Endpoint以华东1(杭州)为例,其他Region请按实际情况填写。
# 填写Bucket名称,例如examplebucket。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
# 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
key = 'exampledir/exampleobject.txt'
# 填写本地文件的完整路径,例如D:\\localpath\\examplefile.txt。
filename = 'D:\\localpath\\examplefile.txt'

total_size = os.path.getsize(filename)
# determine_part_size方法用于确定分片大小。
part_size = determine_part_size(total_size, preferred_size=100 * 1024)

# 初始化分片。
# 如需在初始化分片时设置文件存储类型,请在init_multipart_upload中设置相关Headers,参考如下。
# headers = dict()
# 指定该Object的网页缓存行为。
# headers['Cache-Control'] = 'no-cache'
# 指定该Object被下载时的名称。
# headers['Content-Disposition'] = 'oss_MultipartUpload.txt'
# 指定该Object的内容编码格式。
# headers['Content-Encoding'] = 'utf-8'
# 指定过期时间,单位为毫秒。
# headers['Expires'] = '1000'
# 指定初始化分片上传时是否覆盖同名Object。此处设置为true,表示禁止覆盖同名Object。
# headers['x-oss-forbid-overwrite'] = 'true'
# 指定上传该Object的每个Part时使用的服务器端加密方式。
# headers[OSS_SERVER_SIDE_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS
# 指定Object的加密算法。如果未指定此选项,表明Object使用AES256加密算法。
# headers[OSS_SERVER_SIDE_DATA_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS
# 表示KMS托管的用户主密钥。
# headers[OSS_SERVER_SIDE_ENCRYPTION_KEY_ID] = '9468da86-3509-4f8d-a61e-6eab1eac****'
# 指定Object的存储类型。
# headers['x-oss-storage-class'] = oss2.BUCKET_STORAGE_CLASS_STANDARD
# 指定Object的对象标签,可同时设置多个标签。
# headers[OSS_OBJECT_TAGGING] = 'k1=v1&k2=v2&k3=v3'
# upload_id = bucket.init_multipart_upload(key, headers=headers).upload_id
upload_id = bucket.init_multipart_upload(key).upload_id
parts = []

# 逐个上传分片。
with open(filename, 'rb') as fileobj:
    part_number = 1
    offset = 0
    while offset < total_size:
        num_to_upload = min(part_size, total_size - offset)
        # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
        result = bucket.upload_part(key, upload_id, part_number,
                                    SizedFileAdapter(fileobj, num_to_upload))
        parts.append(PartInfo(part_number, result.etag))

        offset += num_to_upload
        part_number += 1

# 完成分片上传。
# 如需在完成分片上传时设置相关Headers,请参考如下示例代码。
headers = dict()
# 设置文件访问权限ACL。此处设置为OBJECT_ACL_PRIVATE,表示私有权限。
# headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PRIVATE
# 如果指定了x-oss-complete-all:yes,则OSS会列举当前uploadId已上传的所有Part,然后按照PartNumber的序号排序并执行CompleteMultipartUpload操作。
# 如果指定了x-oss-complete-all:yes,则不允许继续指定body,否则报错。
headers["x-oss-complete-all"] = 'yes'
bucket.complete_multipart_upload(key, upload_id, parts, headers=headers)
# bucket.complete_multipart_upload(key, upload_id, parts)

# 验证分片上传。
with open(filename, 'rb') as fileobj:
    assert bucket.get_object(key).read() == fileobj.read()            
说明 网络情况较好时,建议增大分片大小。反之,减小分片大小。

取消分片上传事件

您可以调用bucket.abort_multipart_upload方法来取消分片上传事件。当一个分片上传事件被取消后,无法再使用此uploadId做任何操作,已经上传的分片数据会被删除。

以下代码用于取消分片上传事件。

# -*- coding: utf-8 -*-
import os
import oss2

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret')
# Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。
# 填写Bucket名称,例如examplebucket。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
# 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
key = 'exampledir/exampleobject.txt'
# 由init_multipart_upload接口返回的upload_id。
upload_id = 'yourUploadId'

# 取消指定upload_id的分片上传事件,已上传的分片会被删除。
bucket.abort_multipart_upload(key, upload_id)

列举已上传的分片信息

以下代码用于列举已上传的分片信息:

# -*- coding: utf-8 -*-
import os
import oss2

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret')
# Endpoint以杭州为例,其它Region请按实际情况填写。
bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'yourBucketName')
# 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
key = 'exampledir/exampleobject.txt'
# 由init_multipart_upload接口返回的upload_id。
upload_id = 'yourUploadId'

# 列举指定upload_id对应的已上传分片信息。
for part_info in oss2.PartIterator(bucket, key, upload_id):
    print('part_number:', part_info.part_number)
    print('etag:', part_info.etag)
    print('size:', part_info.size)

列举分片上传事件

  • 列举指定Object的分片上传事件

    以下代码用于列举指定Object的分片上传事件:

    # -*- coding: utf-8 -*-
    import os
    import oss2
    
    # 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
    auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret')
    # Endpoint以杭州为例,其它Region请按实际情况填写。
    # 填写Bucket名称,例如examplebucket。
    bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
    # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
    key = 'exampledir/exampleobject.txt'
    
    # 列举Object的所有分片上传事件。对于同一个Object,每次调用init_multipart_upload均会返回不同的upload_id。
    # 一个upload_id对应一个分片上传事件。
    for upload_info in oss2.ObjectUploadIterator(bucket, key):
        print('key:', upload_info.key)
        print('upload_id:', upload_info.upload_id)
  • 列举存储空间下的所有分片事件

    以下代码用于列举存储空间下的所有分片上传事件:

    # -*- coding: utf-8 -*-
    import os
    import oss2
    
    # 阿里云账号AccessKey拥有所有API的访问权限,风险很高阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
    auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret')
    # Endpoint以杭州为例,其它Region请按实际情况填写。
    # 填写Bucket名称,例如examplebucket。
    bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
    
    # 列举存储空间下的所有分片上传事件。
    for upload_info in oss2.MultipartUploadIterator(bucket):
        print('key:', upload_info.key)
        print('upload_id:', upload_info.upload_id)
  • 列举存储空间下指定前缀Object的分片上传事件

    以下代码用于列举存储空间下指定前缀Object的分片上传事件:

    # -*- coding: utf-8 -*-
    import os
    import oss2
    # 阿里云账号AccessKey拥有所有API的访问权限,风险很高阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
    auth = oss2.Auth('yourAccessKeyId', 'yourAccessKeySecret')
    # Endpoint以杭州为例,其它Region请按实际情况填写。
    # 填写Bucket名称,例如examplebucket。
    bucket = oss2.Bucket(auth, 'https://oss-cn-hangzhou.aliyuncs.com', 'examplebucket')
    
    # 列举存储空间下以'test'为前缀的Object的分片上传事件。
    for upload_info in oss2.MultipartUploadIterator(bucket, prefix='test'):
        print('key:', upload_info.key)
        print('upload_id:', upload_info.upload_id)

相关文档

  • 分片上传的完整实现涉及三个API接口,详情如下:
  • 关于取消分片上传事件的API接口说明,请参见AbortMultipartUpload
  • 关于列举已上传分片的API接口说明,请参见ListParts
  • 关于列举所有执行中的分片上传事件(即已初始化但尚未完成或已取消的分片上传事件)的API接口说明,请参见ListMultipartUploads