Python分片拷贝

更新时间:2025-03-07 06:13:44

本文介绍如何使用Python SDK V2UploadPartCopy方法,将源Bucket中的多个分片文件拷贝到同一地域下相同或不同目标Bucket中,然后合并成一个完整的文件对象。

注意事项

  • 本文示例代码以华东1(杭州)的地域IDcn-hangzhou为例,默认使用外网Endpoint,如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的RegionEndpoint的对应关系,请参见OSS地域和访问域名

  • 要进行拷贝文件,您必须拥有源文件的读权限及目标Bucket的读写权限。

  • 不支持跨地域拷贝。例如不能将华东1(杭州)地域存储空间中的文件拷贝到华北1(青岛)地域。

  • 拷贝文件时,您需要确保源Bucket和目标Bucket均未设置合规保留策略,否则报错The object you specified is immutable.

方法定义

upload_part_copy(request: UploadPartCopyRequest, **kwargs) → UploadPartCopyResult

请求参数列表

参数名

类型

说明

参数名

类型

说明

request

UploadPartCopyRequest

设置请求参数,具体请参见UploadPartCopyRequest

返回值列表

类型

说明

类型

说明

UploadPartCopyResult

返回值,具体请参见UploadPartCopyResult

关于分片拷贝方法的完整定义,请参见upload_part_copy

分片拷贝流程

分片拷贝分为以下三个步骤:

  1. 初始化一个分片上传事件。

    调用client.initiate_multipart_upload方法返回OSS创建的全局唯一的uploadID。

  2. 上传分片。

    调用client.upload_part_copy方法上传分片数据。

    说明
    • 对于同一个uploadID,分片号(partNumber)标识了该分片在整个文件内的相对位置。如果使用同一个分片号上传了新的数据,那么OSS上该分片已有的数据将会被覆盖。

    • OSS将收到的分片数据的MD5值放在ETag头内返回给用户。

    • OSS计算上传数据的MD5值,并与SDK计算的MD5值比较,如果不一致则返回InvalidDigest错误码。

  3. 完成分片上传。

    所有分片上传完成后,调用client.complete_multipart_upload方法将所有分片合并成完整的文件。

示例代码

您可以使用以下代码将多个分片文件从源存储空间拷贝到目标存储空间,然后合并成完整的文件对象。

import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:同步分片拷贝上传示例
parser = argparse.ArgumentParser(description="upload part copy synchronously sample")

# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令行参数 --bucket,表示要上传对象的目标存储空间名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令行参数 --key,表示目标对象在OSS中的键名,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)
# 添加命令行参数 --source_bucket,表示源对象所在存储空间的名称,必需参数
parser.add_argument('--source_bucket', help='The name of the source bucket.', required=True)
# 添加命令行参数 --source_key,表示源对象在OSS中的键名,必需参数
parser.add_argument('--source_key', help='The name of the source object.', required=True)

def main():
    # 解析命令行提供的参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问OSS所需的认证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 获取源对象的元数据
    result_meta = client.get_object_meta(oss.GetObjectMetaRequest(
        bucket=args.source_bucket,
        key=args.source_key,
    ))

    # 初始化一个多部分上传请求,返回一个UploadId用于标识这个过程
    result = client.initiate_multipart_upload(oss.InitiateMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
    ))

    # 定义每个分片的大小(此处设置为1MB)
    part_size = 1024 * 1024
    total_size = result_meta.content_length  # 源文件总大小
    part_number = 1  # 分片编号从1开始
    upload_parts = []  # 用来存储已上传分片的信息
    offset = 0  # 当前处理到的字节偏移量

    # 循环处理直到所有数据都被上传
    while offset < total_size:
        num_to_upload = min(part_size, total_size - offset)  # 计算本次要上传的数据量
        end = offset + num_to_upload - 1  # 确定结束位置

        # 执行实际的分片拷贝上传操作
        up_result = client.upload_part_copy(oss.UploadPartCopyRequest(
            bucket=args.bucket,
            key=args.key,
            upload_id=result.upload_id,
            part_number=part_number,
            source_bucket=args.source_bucket,
            source_key=args.source_key,
            source_range=f'bytes={offset}-{end}',  # 指定源对象中的范围
        ))

        # 输出该分片上传的状态信息
        print(f'status code: {up_result.status_code},'
              f' request id: {up_result.request_id},'
              f' part number: {part_number},'
              f' last modified: {up_result.last_modified},'
              f' etag: {up_result.etag},'
              f' source version id: {up_result.source_version_id}'
        )

        # 将成功上传的分片信息记录下来
        upload_parts.append(oss.UploadPart(part_number=part_number, etag=up_result.etag))
        offset += num_to_upload  # 更新偏移量
        part_number += 1  # 更新分片编号

    # 对所有已上传的分片按其编号排序
    parts = sorted(upload_parts, key=lambda p: p.part_number)

    # 向OSS服务发送请求,通知其完成多部分上传
    result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
        upload_id=result.upload_id,
        complete_multipart_upload=oss.CompleteMultipartUpload(
            parts=parts
        )
    ))

    # 输出最终完成上传后的详细结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' bucket: {result.bucket},'
          f' key: {result.key},'
          f' location: {result.location},'
          f' etag: {result.etag},'
          f' encoding type: {result.encoding_type},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id}'
    )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

相关文档

  • 本页导读 (1)
  • 注意事项
  • 方法定义
  • 分片拷贝流程
  • 示例代码
  • 相关文档
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等