追加上传是指在已上传的追加类型文件(Appendable Object)末尾直接追加内容。本文介绍如何使用OSS Python SDK进行追加上传。
注意事项
本文示例代码以华东1(杭州)的地域ID
cn-hangzhou为例,默认使用外网Endpoint,如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的Region与Endpoint的对应关系,请参见OSS地域和访问域名。当文件不存在时,调用追加上传方法会创建一个追加类型文件。
当文件已存在时:
如果文件为追加类型文件,且设置的追加位置和文件当前长度相等,则直接在该文件末尾追加内容。
如果文件为追加类型文件,但是设置的追加位置和文件当前长度不相等,则抛出PositionNotEqualToLength异常。
如果文件为非追加类型文件,例如通过简单上传的文件类型为Normal的文件,则抛出ObjectNotAppendable异常。
权限说明
阿里云账号默认拥有全部权限。阿里云账号下的RAM用户或RAM角色默认没有任何权限,需要阿里云账号或账号管理员通过RAM Policy或Bucket Policy授予操作权限。
API  | Action  | 说明  | 
AppendObject  | 
  | 以追加写的方式上传文件(Object)。  | 
  | 以追加写的方式上传文件(Object)时,如果通过x-oss-tagging指定Object的标签,则需要此操作的权限。  | 
方法定义
针对文件追加上传的场景,Python SDK V2新增了AppendFile方法以模仿文件的读写行为,用于操作存储空间里的对象,以下列举了AppendFile与AppendObject方法的具体说明:
方法名  | 说明  | 
AppendFile  | 与AppendObject方法能力一致 优化了重传时失败后容错处理  | 
AppendObject  | 追加上传, 最终文件最大支持5GiB 支持CRC64数据校验(默认启用) 支持进度条  | 
高级版追加上传API:AppendFile
调用AppendFile方法以追加写的方式上传数据。如果对象不存在,则创建追加类型的对象。如果对象存在,并且不为追加类型的对象,则返回错误。
AppendFile方法定义如下。
append_file(bucket: str, key: str, request_payer: str | None = None, create_parameter: AppendObjectRequest | None = None, **kwargs) → AppendOnlyFile请求参数列表
参数名  | 类型  | 说明  | 
bucket  | str  | 设置存储空间名  | 
key  | str  | 设置对象名  | 
RequestPayer  | str  | 启用了请求者付费模式时,需要设置为'requester'  | 
CreateParameter  | AppendObjectRequest  | 用于首次上传时,设置对象的元信息,包括ContentType,Metadata,权限,存储类型等,具体请参见AppendObjectRequest  | 
返回值列表
类型  | 说明  | 
AppendOnlyFile  | 追加文件的实例,具体请参见AppendOnlyFile  | 
其中,AppendOnlyFile类包含的方法说明如下:
方法名  | 说明  | 
Close()  | 关闭文件句柄,释放资源  | 
write(b)  | 将字节数据写入到文件中,返回写入的字节数  | 
write_from(b: str | bytes | Iterable[bytes] | IO[str] | IO[bytes])  | 将任意数据写入到文件中,返回写入的字节数  | 
关于AppendFile方法的完整定义,请参见append_file。
基础版追加上传API:AppendObject
append_object(request: AppendObjectRequest, **kwargs) → AppendObjectResult请求参数列表
参数名  | 类型  | 说明  | 
request  | AppendObjectRequest  | 设置请求参数,具体请参见AppendObjectRequest  | 
返回值列表
类型  | 说明  | 
AppendObjectResult  | 返回值,具体请参见AppendObjectResult  | 
关于AppendObject方法的完整定义,请参见append_object。
示例代码
(推荐)使用AppendFile追加上传
import argparse
import alibabacloud_oss_v2 as oss
# 创建命令行参数解析器,并描述脚本用途:示例展示如何向OSS对象追加数据
parser = argparse.ArgumentParser(description="append file sample")
# 添加命令行参数 --region,表示存储空间所在的区域,必需参数
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# 添加命令行参数 --bucket,表示要操作的存储空间名称,必需参数
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# 添加命令行参数 --endpoint,表示其他服务可用来访问OSS的域名,非必需参数
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# 添加命令行参数 --key,表示对象(文件)在OSS中的键名,必需参数
parser.add_argument('--key', help='The name of the object.', required=True)
def main():
    # 解析命令行提供的参数,获取用户输入的值
    args = parser.parse_args()
    # 从环境变量中加载访问OSS所需的认证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    # 使用SDK的默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider
    cfg.region = args.region
    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint
    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)
    # 定义要追加的数据
    data1 = b'hello'
    data2 = b' world. '
    # 第一次追加数据
    with client.append_file(bucket=args.bucket, key=args.key) as f:
        append_f = f
        f.write(data1)
    # 打印第一次追加后的文件状态
    print(f'closed: {append_f.closed},'
          f' name: {append_f.name}'
    )
    # 第二次追加数据
    with client.append_file(bucket=args.bucket, key=args.key) as f:
        append_f = f
        f.write(data2)
    # 打印第二次追加后的文件状态
    print(f'closed: {append_f.closed},'
          f' name: {append_f.name}'
    )
    # 获取追加后的对象内容
    result = client.get_object(oss.GetObjectRequest(
        bucket=args.bucket,
        key=args.key,
    ))
    # 打印获取对象的结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' content: {result.body.content.decode("utf-8")}'
    )
# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始使用AppendObject追加上传
import argparse
import alibabacloud_oss_v2 as oss
# 创建一个命令行参数解析器
parser = argparse.ArgumentParser(description="append object sample")
# 添加命令行参数
# --region: 指定OSS存储空间所在的区域
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# --bucket: 指定要操作的存储空间名称
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# --endpoint: 可选参数,指定访问OSS服务的域名
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# --key: 指定对象(文件)在OSS中的键名
parser.add_argument('--key', help='The name of the object.', required=True)
def main():
    # 解析命令行输入的参数
    args = parser.parse_args()
    # 从环境变量中加载OSS所需的认证信息
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    # 使用SDK提供的默认配置创建配置对象
    cfg = oss.config.load_default()
    # 设置认证信息提供者为之前创建的对象
    cfg.credentials_provider = credentials_provider
    # 根据用户输入设置OSS客户端使用的区域
    cfg.region = args.region
    # 如果用户提供了自定义的endpoint,则更新配置
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint
    # 使用上述配置创建OSS客户端实例
    client = oss.Client(cfg)
    # 定义要追加的数据
    data1 = b'hello'
    data2 = b' world'
    # 第一次追加数据
    result = client.append_object(oss.AppendObjectRequest(
        bucket=args.bucket,  # 指定目标存储空间
        key=args.key,  # 指定对象的键名
        position=0,  # 追加的起始位置,初始为0
        body=data1,  # 要追加的数据
    ))
    # 打印第一次追加的结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' version id: {result.version_id},'
          f' hash crc64: {result.hash_crc64},'
          f' next position: {result.next_position},' 
    )
    # 第二次追加数据
    result = client.append_object(oss.AppendObjectRequest(
        bucket=args.bucket,  # 指定目标存储空间
        key=args.key,  # 指定对象的键名
        position=result.next_position,  # 从上一次追加的下一个位置开始
        body=data2,  # 要追加的数据
    ))
    # 打印第二次追加的结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' version id: {result.version_id},'
          f' hash crc64: {result.hash_crc64},'
          f' next position: {result.next_position},'
    )
# 当此脚本被直接运行时,调用main函数
if __name__ == "__main__":
    main()常见使用场景
追加上传显示进度条
import argparse
import alibabacloud_oss_v2 as oss
# 创建一个命令行参数解析器
parser = argparse.ArgumentParser(description="append object sample")
# 添加命令行参数
# --region: 指定OSS存储空间所在的区域
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
# --bucket: 指定要操作的存储空间名称
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
# --endpoint: 可选参数,指定访问OSS服务的域名
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
# --key: 指定对象(文件)在OSS中的键名
parser.add_argument('--key', help='The name of the object.', required=True)
def main():
    # 解析命令行输入的参数
    args = parser.parse_args()
    # 从环境变量中加载OSS所需的认证信息
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
    # 使用SDK提供的默认配置创建配置对象
    cfg = oss.config.load_default()
    # 设置认证信息提供者为之前创建的对象
    cfg.credentials_provider = credentials_provider
    # 根据用户输入设置OSS客户端使用的区域
    cfg.region = args.region
    # 如果用户提供了自定义的endpoint,则更新配置
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint
    # 使用上述配置创建OSS客户端实例
    client = oss.Client(cfg)
    # 定义一个字典变量 progress_state 用于保存上传进度状态,初始值为 0
    progress_state = {'saved': 0}
    def _progress_fn(n, written, total):
        # 使用字典存储累计写入的字节数,避免使用 global 变量
        progress_state['saved'] += n
        # 计算当前上传百分比,将已写入字节数与总字节数进行除法运算后取整
        rate = int(100 * (float(written) / float(total)))
        # 打印当前上传进度,\r 表示回到行首,实现命令行中实时刷新效果
        # end='' 表示不换行,使下一次打印覆盖当前行
        print(f'\r上传进度:{rate}% ', end='')
    # 定义要追加的数据
    data1 = b'hello'
    data2 = b' world'
    # 第一次追加数据
    result = client.append_object(oss.AppendObjectRequest(
        bucket=args.bucket,  # 指定目标存储空间
        key=args.key,  # 指定对象的键名
        position=0,  # 追加的起始位置,初始为0
        body=data1,  # 要追加的数据
        progress_fn=_progress_fn,  # 设置进度回调函数
    ))
    # 打印第一次追加的结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' version id: {result.version_id},'
          f' hash crc64: {result.hash_crc64},'
          f' next position: {result.next_position},'
    )
    # 第二次追加数据
    result = client.append_object(oss.AppendObjectRequest(
        bucket=args.bucket,  # 指定目标存储空间
        key=args.key,  # 指定对象的键名
        position=result.next_position,  # 从上一次追加的下一个位置开始
        body=data2,  # 要追加的数据
        progress_fn=_progress_fn,  # 设置进度回调函数
    ))
    # 打印第二次追加的结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' version id: {result.version_id},'
          f' hash crc64: {result.hash_crc64},'
          f' next position: {result.next_position},'
    )
# 当此脚本被直接运行时,调用main函数
if __name__ == "__main__":
    main()相关文档
关于追加上传的完整示例代码,请参见append_file.py和append_object.py。