Python拷贝文件

本文介绍如何在开启版本控制的存储空间(Bucket)中拷贝文件(Object)。您可以通过CopyObject的方法拷贝小于1 GB的文件,通过分片拷贝(UploadPartCopy)的方法拷贝大于1 GB的文件。

注意事项

  • 本文示例代码以华东1(杭州)的地域IDcn-hangzhou为例,默认使用外网Endpoint,如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的RegionEndpoint的对应关系,请参见OSS地域和访问域名

  • 要拷贝文件,您必须有oss:GetObjectoss:PutObject权限。具体操作,请参见RAM用户授权自定义的权限策略

示例代码

拷贝对象

说明

对于小于1 GB的文件,您可以通过CopyObject方法将文件从一个存储空间(源存储空间)复制到同一地域的另一个存储空间(目标存储空间)。

  • x-oss-copy-source默认拷贝Object的当前版本。如果当前版本是删除标记,则返回404表示该Object不存在。您可以在x-oss-copy-source中加入versionId来拷贝指定的Object版本,删除标记不能被拷贝。

  • 您可以将Object的早期版本拷贝到同一个Bucket中,拷贝Object的历史版本将会成为一个新的当前版本,达到恢复Object早期版本的目的。

  • 如果目标Bucket已开启版本控制,OSS将会为新拷贝出来的Object自动生成唯一的versionId,此versionId将会在响应headerx-oss-version-id中返回。如果目标Bucket未曾开启或者暂停了版本控制,OSS将会为新拷贝的Object自动生成versionId为“null”的版本,且会覆盖原先versionId为“null”的版本。

  • 目标Bucket在开启或暂停版本控制状态下,不支持对Appendable类型Object执行拷贝操作。

您可以使用以下代码进行拷贝对象。

import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:复制存储空间中的对象
parser = argparse.ArgumentParser(description="copy object sample")

# 定义命令行参数,包括必需的区域、目标存储空间名称、源存储空间名称、目标对象名称、源对象名称、源对象版本ID以及可选的endpoint
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the destination bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the destination object.', required=True)
parser.add_argument('--source_key', help='The name of the source object.', required=True)
parser.add_argument('--source_bucket', help='The name of the source bucket.', required=True)
parser.add_argument('--source_version_id', help='The version ID of the source object.',required=True)

def main():
    # 解析命令行参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问凭证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 发送请求以复制指定的对象
    result = client.copy_object(oss.CopyObjectRequest(
        bucket=args.bucket,  # 目标存储空间名
        key=args.key,  # 目标对象名
        source_key=args.source_key,  # 源对象名
        source_bucket=args.source_bucket,  # 源存储空间名
        source_version_id=args.source_version_id,  # 源对象版本ID
    ))

    # 打印操作结果的各种信息,以便确认请求状态
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' version id: {result.version_id},'
          f' hash crc64: {result.hash_crc64},'
          f' source version id: {result.source_version_id},'
          f' server side encryption: {result.server_side_encryption},'
          f' server side data encryption: {result.server_side_data_encryption},'
          f' last modified: {result.last_modified},'
          f' etag: {result.etag},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

分片拷贝

说明

对于大于1GB的文件,需要使用分片拷贝(UploadPartCopy)。

  • UploadPartCopy默认从一个已存在的Object的当前版本中拷贝数据来上传一个Part。允许通过在UploadPartCopyRequest中附带SourceVersionId参数,实现从Object的指定版本进行拷贝。

  • 如果未指定versionId且拷贝Object的当前版本为删除标记,OSS将返回404 Not Found。通过指定versionId来拷贝删除标记时,OSS将返回400 Bad Request。

您可以使用以下代码进行分片拷贝对象。

import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:同步执行分片拷贝上传
parser = argparse.ArgumentParser(description="upload part copy synchronously sample")

# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令行参数 --bucket,表示目标存储空间的名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令行参数 --key,表示目标对象的名称,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)
# 添加命令行参数 --source_bucket,表示源存储空间的名称,必需参数
parser.add_argument('--source_bucket', help='The name of the source bucket.', required=True)
# 添加命令行参数 --source_key,表示源对象的名称,必需参数
parser.add_argument('--source_key', help='The name of the source object.', required=True)
# 添加命令行参数 --source_version_id,表示源对象的版本ID,必需参数
parser.add_argument('--source_version_id', help='The version id of the source object.', required=True)

def main():
    # 解析命令行提供的参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问OSS所需的认证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 获取源对象的元数据,包括文件大小等信息
    result_meta = client.get_object_meta(oss.GetObjectMetaRequest(
        bucket=args.source_bucket,
        key=args.source_key,
    ))

    # 初始化一个多部分上传任务,返回一个UploadId用于标识这个过程
    result = client.initiate_multipart_upload(oss.InitiateMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
    ))

    # 定义每个分片的大小(此处设置为1MB)
    part_size = 1024 * 1024
    total_size = result_meta.content_length  # 源文件总大小
    part_number = 1  # 分片编号从1开始
    upload_parts = []  # 用来存储已上传分片的信息
    offset = 0  # 当前处理到的字节偏移量

    # 循环处理直到所有数据都被上传
    while offset < total_size:
        num_to_upload = min(part_size, total_size - offset)  # 计算本次要上传的数据量
        end = offset + num_to_upload - 1  # 确定结束位置
        # 执行实际的分片拷贝上传操作
        up_result = client.upload_part_copy(oss.UploadPartCopyRequest(
            bucket=args.bucket,
            key=args.key,
            upload_id=result.upload_id,
            part_number=part_number,
            source_bucket=args.source_bucket,
            source_key=args.source_key,
            source_version_id=args.source_version_id,  # 需替换为实际的版本ID
            source_range=f'bytes={offset}-{end}',  # 指定源对象中的范围
        ))
        # 输出该分片上传的状态信息
        print(f'status code: {up_result.status_code},'
              f' request id: {up_result.request_id},'
              f' part number: {part_number},'
              f' last modified: {up_result.last_modified},'
              f' etag: {up_result.etag},'
              f' source version id: {up_result.source_version_id},'
              )
        # 将成功上传的分片信息记录下来
        upload_parts.append(oss.UploadPart(part_number=part_number, etag=up_result.etag))
        offset += num_to_upload  # 更新偏移量
        part_number += 1  # 更新分片编号

    # 对所有已上传的分片按其编号排序
    parts = sorted(upload_parts, key=lambda p: p.part_number)
    # 向OSS服务发送请求,通知其完成多部分上传
    result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
        upload_id=result.upload_id,
        complete_multipart_upload=oss.CompleteMultipartUpload(
            parts=parts
        )
    ))

    # 输出最终完成上传后的详细结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' bucket: {result.bucket},'
          f' key: {result.key},'
          f' location: {result.location},'
          f' etag: {result.etag},'
          f' encoding type: {result.encoding_type},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

相关文档