Python上传文件

更新时间:2025-03-07 07:52:44

本文介绍如何使用Python SDK V2在开启版本控制的存储空间(Bucket)中上传文件(Object)。

注意事项

  • 本文示例代码以华东1(杭州)的地域IDcn-hangzhou为例,默认使用外网Endpoint,如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的RegionEndpoint的对应关系,请参见OSS地域和访问域名

  • 要上传文件,您必须有oss:PutObject权限。具体操作,请参见RAM用户授权自定义的权限策略

示例代码

简单上传

说明
  • 在已开启版本控制的Bucket中,OSS会为新添加的Object自动生成唯一的VersionId,并在响应header中通过x-oss-version-id形式返回。

  • 在暂停了版本控制的Bucket中,新添加的ObjectVersionId为“null”,上传同名Object,后一次会覆盖前一次上传的文件内容。OSS保证同一个Object只会有一个版本的ID为“null”。

您可以使用以下代码进行简单上传。

import argparse
import requests
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器
parser = argparse.ArgumentParser(description="put object sample")
# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令行参数 --bucket,表示存储空间的名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令行参数 --key,表示对象的名称,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)

def main():
    args = parser.parse_args()  # 解析命令行参数

    # 从环境变量中加载凭证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 加载SDK的默认配置,并设置凭证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    # 设置配置中的区域信息
    cfg.region = args.region
    # 如果提供了endpoint参数,则设置配置中的endpoint
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用配置好的信息创建OSS客户端
    client = oss.Client(cfg)

    # 上传本地文件
    local_file_path = '/yourLocalFilePath/yourFileName'
    with open(local_file_path, 'rb') as file:
        data = file.read()

    # 执行上传对象的请求,指定存储空间名称、对象名称和数据内容
    result = client.put_object(oss.PutObjectRequest(
        bucket=args.bucket,
        key=args.key,
        body=data,
    ))

    # 输出请求的结果状态码、请求ID、内容MD5、ETag、CRC64校验码和版本ID,用于检查请求是否成功
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content md5: {result.content_md5},'
          f' etag: {result.etag},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
    )

if __name__ == "__main__":
    main()  # 脚本入口,当文件被直接运行时调用main函数

追加上传

在受版本控制的Bucket中,仅支持对于当前版本为Appendable类型的Object执行追加(AppendObject)操作,不支持对于历史版本为Appendable类型的Object执行AppendObject操作。

说明
  • 对当前版本为Appendable类型的Object执行AppendObject操作时,OSS不会为该Appendable类型的Object生成历史版本。

  • 对当前版本为Appendable类型的Object执行PutObjectDeleteObject操作时,OSS会将该Appendable类型的Object保留为历史版本,且该Object不允许继续追加。

  • 不支持对当前版本为非Appendable类型的Object(包括 Normal Object、Delete Marker等)执行AppendObject 操作。

您可以使用以下代码进行追加上传。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器,并描述脚本用途:示例展示如何向OSS存储空间中的对象追加数据
parser = argparse.ArgumentParser(description="append object sample")

# 添加命令行参数
# --region: 指定OSS存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# --bucket: 指定要操作的存储空间名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# --endpoint: 可选参数,指定访问OSS服务的域名
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# --key: 指定对象(文件)在OSS中的键名,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)

def main():
    # 解析命令行输入的参数
    args = parser.parse_args()

    # 从环境变量中加载OSS所需的认证信息,确保安全性和灵活性
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK提供的默认配置创建配置对象
    cfg = oss.config.load_default()

    # 设置认证信息提供者为之前创建的对象
    cfg.credentials_provider = credentials_provider

    # 根据用户输入设置OSS客户端使用的区域
    cfg.region = args.region

    # 如果用户提供了自定义的endpoint,则更新配置
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置创建OSS客户端实例,准备与OSS服务进行交互
    client = oss.Client(cfg)

    # 定义要追加的数据
    data1 = b'hello'  # 第一次追加的数据
    data2 = b' world'  # 第二次追加的数据

    # 第一次追加数据
    result = client.append_object(oss.AppendObjectRequest(
        bucket=args.bucket,  # 指定目标存储空间
        key=args.key,  # 指定对象的键名
        position=0,  # 追加的起始位置,初始为0
        body=data1,  # 要追加的数据
    ))

    # 打印第一次追加的结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' version id: {result.version_id},'
          f' hash crc64: {result.hash_crc64},'
          f' next position: {result.next_position},'
          f' server side encryption: {result.server_side_encryption},'
          f' server side data encryption: {result.server_side_data_encryption},'
    )

    # 第二次追加数据
    result = client.append_object(oss.AppendObjectRequest(
        bucket=args.bucket,  # 指定目标存储空间
        key=args.key,  # 指定对象的键名
        position=result.next_position,  # 从上一次追加的下一个位置开始
        body=data2,  # 要追加的数据
    ))

    # 打印第二次追加的结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' version id: {result.version_id},'
          f' hash crc64: {result.hash_crc64},'
          f' next position: {result.next_position},'
          f' server side encryption: {result.server_side_encryption},'
          f' server side data encryption: {result.server_side_data_encryption},'
    )

# 当此脚本被直接运行时,调用main函数
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

分片上传

说明

在受版本控制的Bucket中,调用CompleteMultipartUpload接口来完成整个文件的分片上传,OSS会为整个文件生成唯一的版本ID,并在响应header中以x-oss-version-id的形式返回。

您可以使用以下代码进行分片上传。

import os
import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:示例展示如何进行OSS存储空间中的分片上传
parser = argparse.ArgumentParser(description="multipart upload sample")

# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)

# 添加命令行参数 --bucket,表示存储空间的名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)

# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')

# 添加命令行参数 --key,表示对象的名称,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)

# 添加命令行参数 --file_path,表示要上传的文件路径,必需参数
parser.add_argument('--file_path', help='The path of Upload file.', required=True)

def main():
    # 解析命令行输入的参数
    args = parser.parse_args()

    # 从环境变量中加载OSS所需的认证信息,确保安全性和灵活性
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK提供的默认配置创建配置对象
    cfg = oss.config.load_default()

    # 设置认证信息提供者为之前创建的对象
    cfg.credentials_provider = credentials_provider

    # 根据用户输入设置OSS客户端使用的区域
    cfg.region = args.region

    # 如果用户提供了自定义的endpoint,则更新配置
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置创建OSS客户端实例,准备与OSS服务进行交互
    client = oss.Client(cfg)

    # 初始化分片上传请求,获取upload_id用于后续分片上传
    result = client.initiate_multipart_upload(oss.InitiateMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
    ))

    # 定义每个分片的大小为5MB
    part_size = 5 * 1024 * 1024

    # 获取要上传文件的总大小
    data_size = os.path.getsize(args.file_path)

    # 初始化分片编号,从1开始
    part_number = 1

    # 存储每个分片上传的结果
    upload_parts = []

    # 打开文件以二进制模式读取
    with open(args.file_path, 'rb') as f:
        # 遍历文件,按照part_size分片上传
        for start in range(0, data_size, part_size):
            n = part_size
            if start + n > data_size:  # 处理最后一个分片可能小于part_size的情况
                n = data_size - start

            # 创建SectionReader来读取文件的特定部分
            reader = oss.io_utils.SectionReader(oss.io_utils.ReadAtReader(f), start, n)

            # 上传分片
            up_result = client.upload_part(oss.UploadPartRequest(
                bucket=args.bucket,
                key=args.key,
                upload_id=result.upload_id,
                part_number=part_number,
                body=reader
            ))

            # 打印每个分片上传的结果信息
            print(f'status code: {up_result.status_code},'
                  f' request id: {up_result.request_id},'
                  f' part number: {part_number},'
                  f' content md5: {up_result.content_md5},'
                  f' etag: {up_result.etag},'
                  f' hash crc64: {up_result.hash_crc64},'
                  )

            # 将分片上传结果保存到列表中
            upload_parts.append(oss.UploadPart(part_number=part_number, etag=up_result.etag))

            # 增加分片编号
            part_number += 1

    # 对上传的分片按照分片编号排序
    parts = sorted(upload_parts, key=lambda p: p.part_number)

    # 发送完成分片上传请求,合并所有分片为一个完整的对象
    result = client.complete_multipart_upload(oss.CompleteMultipartUploadRequest(
        bucket=args.bucket,
        key=args.key,
        upload_id=result.upload_id,
        complete_multipart_upload=oss.CompleteMultipartUpload(
            parts=parts
        )
    ))

    # 输出完成分片上传的结果信息
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' bucket: {result.bucket},'
          f' key: {result.key},'
          f' location: {result.location},'
          f' etag: {result.etag},'
          f' encoding type: {result.encoding_type},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
    )

# 当此脚本被直接运行时,调用main函数
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始
  • 本页导读 (1)
  • 注意事项
  • 示例代码
  • 简单上传
  • 追加上传
  • 分片上传