Python文件上传管理器

本文针对文件的传输场景,介绍如何使用Python SDK V2新增的上传管理器Uploader模块进行文件上传。

注意事项

  • 本文示例代码以华东1(杭州)的地域IDcn-hangzhou为例,默认使用外网Endpoint,如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的RegionEndpoint的对应关系,请参见OSS地域和访问域名

  • 要进行上传,您必须有oss:PutObject权限。具体操作,请参见RAM用户授予自定义的权限策略

方法定义

上传管理器功能简介

Python SDK V2新增上传管理器Uploader提供了通用的上传方法,隐藏了底层方法的实现细节,提供便捷的文件上传能力。

  • 上传管理器Uploader底层利用分片上传方法,把文件或者流分成多个较小的分片并发上传,提升上传的性能。

  • 上传管理器Uploader同时提供了断点续传的能力,即在上传过程中,记录已完成的分片状态,如果出现网络中断、程序异常退出等问题导致文件上传失败,甚至重试多次仍无法完成上传,再次上传时,可以通过断点记录文件恢复上传。

上传管理器Uploader的常用方法如下:

# 用于上传本地文件
upload_file(request: PutObjectRequest, filepath: str, **kwargs: Any) → UploadResult

# 用于上传文件流
upload_from(request: PutObjectRequest, reader: IO[bytes], **kwargs: Any) → UploadResult

请求参数列表

参数名

类型

说明

request

PutObjectRequest

上传对象的请求参数,和PutObject 方法的请求参数一致,具体请参见PutObjectRequest

reader

IO[bytes]

需要上传的数据流

filepath

str

本地文件路径

返回参数列表

类型

说明

UploadResult

上传对象的返回参数,具体请参见UploadResult

当您使用client.uploader初始化上传管理器实例时,您可以指定多个配置选项来自定义上传行为,例如指定分片大小如下所示。

uploader = client.uploader(part_size=10  * 1024 * 1024)

常用的配置选项说明列举如下:

参数名

类型

说明

part_size

int

指定分片大小,默认值为 6MiB

parallel_num

int

指定上传任务的并发数,默认值为 3。针对的是单次调用的并发限制,而不是全局的并发限制

leave_parts_on_error

bool

当上传失败时,是否保留已上传的分片,默认不保留

enable_checkpoint

bool

是否开启断点上传功能,默认不开启

说明

enable_checkpoint参数目前仅对upload_file方法有效,upload_from方法暂不支持

checkpoint_dir

str

指定记录文件的保存路径,例如 /local/dir/, 当enable_checkpoint 为 true时有效

关于文件上传管理器方法的完整定义,请参见Uploader

示例代码

您可以通过以下代码使用上传管理器上传本地文件到存储空间。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器,并描述脚本用途:上传文件示例
parser = argparse.ArgumentParser(description="upload file sample")

# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令行参数 --bucket,表示要上传文件到的存储空间名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令行参数 --key,表示对象(文件)在OSS中的键名,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)
# 添加命令行参数 --file_path,表示本地待上传文件的路径,必需参数,例如“/Users/yourLocalPath/yourFileName”
parser.add_argument('--file_path', help='The path of Upload file.', required=True)

def main():
    # 解析命令行提供的参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问OSS所需的认证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    
    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 创建一个用于上传文件的对象
    uploader = client.uploader()

    # 调用方法执行文件上传操作
    result = uploader.upload_file(
        oss.PutObjectRequest(
            bucket=args.bucket,  # 指定目标存储空间
            key=args.key,        # 指定文件在OSS中的名称
        ),
        filepath=args.file_path  # 指定本地文件的位置
    )

    # 打印上传结果的相关信息,包括状态码、请求ID、内容MD5等
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content md5: {result.headers.get("Content-MD5")},'
          f' etag: {result.etag},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
          f' server time: {result.headers.get("x-oss-server-time")},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

常见使用场景

使用上传管理器启动断点续传功能

您可以使用以下代码设置上传管理器Uploader的配置参数,启动断点续传功能。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器,并描述脚本用途:上传文件示例
parser = argparse.ArgumentParser(description="upload file sample")

# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令行参数 --bucket,表示要上传文件到的存储空间名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令行参数 --key,表示对象(文件)在OSS中的键名,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)
# 添加命令行参数 --file_path,表示本地待上传文件的路径,必需参数,例如“/Users/yourLocalPath/yourFileName”
parser.add_argument('--file_path', help='The path of Upload file.', required=True)

def main():
    # 解析命令行提供的参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问OSS所需的认证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    
    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 创建一个用于上传文件的对象,并开启断点续传功能,指定断点记录文件的保存路径
    uploader = client.uploader(enable_checkpoint=True, checkpoint_dir="/Users/yourLocalPath/checkpoint/")

    # 调用方法执行文件上传操作
    result = uploader.upload_file(
        oss.PutObjectRequest(
            bucket=args.bucket,  # 指定目标存储空间
            key=args.key,        # 指定文件在OSS中的名称
        ),
        filepath=args.file_path  # 指定本地文件的位置
    )

    # 打印上传结果的相关信息,包括状态码、请求ID、内容MD5等
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content md5: {result.headers.get("Content-MD5")},'
          f' etag: {result.etag},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
          f' server time: {result.headers.get("x-oss-server-time")},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

使用上传管理器上传本地文件流

您可以通过以下代码使用上传管理器上传本地文件流。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器,并描述脚本用途:从文件上传示例
parser = argparse.ArgumentParser(description="upload from sample")

# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令行参数 --bucket,表示要上传文件到的存储空间名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令行参数 --key,表示对象(文件)在OSS中的键名,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)
# 添加命令行参数 --file_path,表示本地待上传文件的路径,必需参数,例如“/Users/yourLocalPath/yourFileName”
parser.add_argument('--file_path', help='The path of Upload file.', required=True)

def main():
    # 解析命令行提供的参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问OSS所需的认证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    
    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 创建一个用于上传文件的对象
    uploader = client.uploader()

    # 打开本地文件以二进制模式读取
    with open(file=args.file_path, mode='rb') as f:
        # 调用方法执行文件上传操作
        result = uploader.upload_from(
            oss.PutObjectRequest(
                bucket=args.bucket,  # 指定目标存储空间
                key=args.key,        # 指定文件在OSS中的名称
            ),
            reader=f  # 传入文件读取器
        )

        # 打印上传结果的相关信息,包括状态码、请求ID、内容MD5等
        print(f'status code: {result.status_code},'
              f' request id: {result.request_id},'
              f' content md5: {result.headers.get("Content-MD5")},'
              f' etag: {result.etag},'
              f' hash crc64: {result.hash_crc64},'
              f' version id: {result.version_id},'
              f' server time: {result.headers.get("x-oss-server-time")},'
              )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

使用上传管理器设置分片大小和并发数

您可以使用以下代码设置上传管理器Uploader的配置参数,设置分片大小和并发数。

import argparse
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器,并描述脚本用途:上传文件示例
parser = argparse.ArgumentParser(description="upload file sample")

# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令行参数 --bucket,表示要上传文件到的存储空间名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令行参数 --key,表示对象(文件)在OSS中的键名,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)
# 添加命令行参数 --file_path,表示本地待上传文件的路径,必需参数,例如“/Users/yourLocalPath/yourFileName”
parser.add_argument('--file_path', help='The path of Upload file.', required=True)

def main():
    # 解析命令行提供的参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问OSS所需的认证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    
    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 创建一个用于上传文件的对象,并设置分片大小和并发数
    uploader = client.uploader(
        part_size=100 * 1024,  # 设置分片大小为100KB
        parallel_num=5,        # 设置并发数为5
        leave_parts_on_error=True  # 在发生错误时保留已上传的部分
    )

    # 调用方法执行文件上传操作
    result = uploader.upload_file(
        oss.PutObjectRequest(
            bucket=args.bucket,  # 指定目标存储空间
            key=args.key,        # 指定文件在OSS中的名称
        ),
        filepath=args.file_path  # 指定本地文件的位置
    )

    # 打印上传结果的相关信息,包括状态码、请求ID、内容MD5等
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content md5: {result.headers.get("Content-MD5")},'
          f' etag: {result.etag},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
          f' server time: {result.headers.get("x-oss-server-time")},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

使用上传管理器设置上传回调

如果您希望在文件上传后通知应用服务器,可参考以下代码示例。

import argparse
import base64
import alibabacloud_oss_v2 as oss

# 创建一个命令行参数解析器,并描述脚本用途:上传文件示例
parser = argparse.ArgumentParser(description="upload file sample")

# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令行参数 --bucket,表示要上传文件到的存储空间名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令行参数 --key,表示对象(文件)在OSS中的键名,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)
# 添加命令行参数 --file_path,表示本地待上传文件的路径,必需参数
parser.add_argument('--file_path', help='The path of Upload file.', required=True)

def main():
    # 解析命令行提供的参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问OSS所需的认证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK的默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 创建一个用于上传文件的uploader对象
    uploader = client.uploader()

    # 定义回调地址
    call_back_url = "http://www.example.com/callback"
    # 构造回调参数(callback):指定回调地址和回调请求体,使用 Base64 编码
    callback=base64.b64encode(str('{\"callbackUrl\":\"' + call_back_url + '\",\"callbackBody\":\"bucket=${bucket}&object=${object}&my_var_1=${x:var1}&my_var_2=${x:var2}\"}').encode()).decode()
    # 构造自定义变量(callback-var),使用 Base64 编码
    callback_var=base64.b64encode('{\"x:var1\":\"value1\",\"x:var2\":\"value2\"}'.encode()).decode()

    # 调用方法执行文件上传操作
    result = uploader.upload_file(
        oss.PutObjectRequest(
            bucket=args.bucket,  # 指定目标存储空间
            key=args.key,        # 指定文件在OSS中的名称
            callback=callback,
            callback_var=callback_var,
        ),
        filepath=args.file_path,  # 指定本地文件的位置
    )

    # 打印上传结果的相关信息,包括状态码、请求ID、内容MD5等
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content md5: {result.headers.get("Content-MD5")},'
          f' etag: {result.etag},'
          f' hash crc64: {result.hash_crc64},'
          f' version id: {result.version_id},'
          f' server time: {result.headers.get("x-oss-server-time")},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

相关文档