异步处理(Python SDK V2)

异步处理(x-oss-async-process)是指程序执行一个任务时,不需要等待该任务完成就能继续执行其他任务。本文介绍如何使用Python SDK V2进行异步处理的场景,例如文档转换、视频转码、视频拼接等。

注意事项

  • 本文示例代码以华东1(杭州)的地域IDcn-hangzhou为例,默认使用外网Endpoint,如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的RegionEndpoint的对应关系,请参见OSS地域和访问域名

方法定义

async_process_object(request: AsyncProcessObjectRequest, **kwargs) → AsyncProcessObjectResult

请求参数列表

参数名

类型

说明

request

AsyncProcessObjectRequest

设置请求参数,具体请参见AsyncProcessObjectRequest

返回值列表

类型

说明

AsyncProcessObjectResult

返回值,具体请参见AsyncProcessObjectResult

关于异步处理方法的完整定义,请参见async_process_object

示例代码

以下代码展示了如何使用Python SDK V2进行文档格式转换,将其转换为需要的输出类型。

import base64
import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:异步处理对象(如图像处理)
parser = argparse.ArgumentParser(description="async process object sample")

# 定义命令行参数,包括必需的区域、存储空间名称、endpoint、对象键名以及目标存储空间和图像名称
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the object.', required=True)
parser.add_argument('--target_key', help='Specify the name of the processed object.', required=True)
parser.add_argument('--target_bucket', help='Specify the name of the bucket used to store processed object.', required=True)

def main():
    # 解析命令行参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问凭证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 定义将源Docx文档转换为PNG图片的处理规则
    style = "doc/convert,target_png,source_docx"

    # 对目标存储空间名和文件名进行Base64编码,以确保它们可以在URL中安全传输
    target_bucket_base64 = base64.b64encode(args.target_bucket.encode()).decode()
    target_key_base64 = base64.b64encode(args.target_key.encode()).decode()

    # 构建处理指令,包含样式和保存位置
    process = f"{style}|sys/saveas,o_{target_key_base64},b_{target_bucket_base64}"

    # 发送异步请求以处理指定对象并按照给定的样式保存到目标存储空间
    result = client.async_process_object(oss.AsyncProcessObjectRequest(
        bucket=args.bucket,  # 存储空间名
        key=args.key,  # 对象键名
        process=process,  # 处理指令
    ))

    # 打印操作结果的状态码和其他相关信息,以便确认请求状态和处理结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' event id: {result.event_id},'
          f' task id: {result.task_id},'
          f' process request id: {result.process_request_id},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

常见使用场景

视频转码

您可以使用视频转码功能,修改视频的编码格式、降低分辨率和码率以缩小视频文件体积、转换视频封装格式。

import base64
import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:异步处理对象
parser = argparse.ArgumentParser(description="async process object sample")

# 定义命令行参数,包括必需的区域、存储空间名称、endpoint、对象键名以及目标存储空间和目标文件名称
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the object.', required=True)
parser.add_argument('--target_key', help='Specify the name of the processed object.', required=True)
parser.add_argument('--target_bucket', help='Specify the name of the bucket used to store processed object.', required=True)

def main():
    # 解析命令行参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问凭证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 定义处理风格,这里是一个示例视频转换配置,包括格式、视频编解码器、分辨率、比特率、帧率、音频编解码器、音频比特率等参数
    style = "video/convert,f_avi,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1"

    # 对目标存储空间名和文件名进行Base64编码,以确保它们可以在URL中安全传输
    target_bucket_base64 = base64.b64encode(args.target_bucket.encode()).decode()
    target_key_base64 = base64.b64encode(args.target_key.encode()).decode()

    # 构建处理指令,包含样式和保存位置
    process = f"{style}|sys/saveas,o_{target_key_base64},b_{target_bucket_base64}"

    # 发送异步请求以处理指定对象并按照给定的样式保存到目标存储空间
    result = client.async_process_object(oss.AsyncProcessObjectRequest(
        bucket=args.bucket,  # 存储空间名
        key=args.key,  # 对象键名
        process=process,  # 处理指令
    ))

    # 打印操作结果的状态码和其他相关信息,以便确认请求状态和处理结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' event id: {result.event_id},'
          f' task id: {result.task_id},'
          f' process request id: {result.process_request_id},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

视频转动图

您可以通过视频转动图功能,将视频转换为GIF、WebP等格式的动图。

import base64
import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:异步处理对象
parser = argparse.ArgumentParser(description="async process object sample")

# 定义命令行参数,包括必需的区域、存储空间名称、endpoint、对象键名以及目标存储空间和图像名称
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the object.', required=True)
parser.add_argument('--target_key', help='Specify the name of the processed object.', required=True)
parser.add_argument('--target_bucket', help='Specify the name of the bucket used to store processed object.', required=True)

def main():
    # 解析命令行参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问凭证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 定义视频转GIF动图的参数,包括GIF宽度、高度、间隔帧数等
    style = "video/animation,f_gif,w_100,h_100,inter_1000"

    # 对目标存储空间名和文件名进行Base64编码,以确保它们可以在URL中安全传输
    target_bucket_base64 = base64.b64encode(args.target_bucket.encode()).decode()
    target_key_base64 = base64.b64encode(args.target_key.encode()).decode()

    # 构建处理指令,包含样式和保存位置
    process = f"{style}|sys/saveas,o_{target_key_base64},b_{target_bucket_base64}"

    # 发送异步请求以处理指定对象并按照给定的样式保存到目标存储空间
    result = client.async_process_object(oss.AsyncProcessObjectRequest(
        bucket=args.bucket,  # 存储空间名
        key=args.key,  # 对象键名
        process=process,  # 处理指令
    ))

    # 打印操作结果的状态码和其他相关信息,以便确认请求状态和处理结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' event id: {result.event_id},'
          f' task id: {result.task_id},'
          f' process request id: {result.process_request_id},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

视频截帧

您可以通过视频截帧功能,按一定规则提取视频帧并转换为需要的图片格式。

import base64
import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:异步处理对象
parser = argparse.ArgumentParser(description="async process object sample")

# 定义命令行参数,包括必需的区域、存储空间名称、endpoint、对象键名以及目标存储空间和图像名称
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the object.', required=True)
parser.add_argument('--target_key', help='Specify the name of the processed object.', required=True)
parser.add_argument('--target_bucket', help='Specify the name of the bucket used to store processed object.', required=True)

def main():
    # 解析命令行参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问凭证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 构建视频截帧参数
    style = "video/snapshots,f_jpg,w_100,h_100,scaletype_crop,inter_10000"

    # 对目标存储空间名和文件名进行Base64编码,以确保它们可以在URL中安全传输
    target_bucket_base64 = base64.b64encode(args.target_bucket.encode()).decode()
    target_key_base64 = base64.b64encode(args.target_key.encode()).decode()

    # 构建处理指令,包含样式和保存位置
    process = f"{style}|sys/saveas,o_{target_key_base64},b_{target_bucket_base64}"

    # 发送异步请求以处理指定对象并按照给定的样式保存到目标存储空间
    result = client.async_process_object(oss.AsyncProcessObjectRequest(
        bucket=args.bucket,  # 存储空间名
        key=args.key,  # 对象键名
        process=process,  # 处理指令
    ))

    # 打印操作结果的状态码和其他相关信息,以便确认请求状态和处理结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' event id: {result.event_id},'
          f' task id: {result.task_id},'
          f' process request id: {result.process_request_id},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

音频转码

您可以通过音频转码功能,将音频转换为需要的格式。

import base64
import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:异步处理对象
parser = argparse.ArgumentParser(description="async process object sample")

# 定义命令行参数,包括必需的区域、存储空间名称、endpoint、对象键名以及目标存储空间和图像名称
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the object.', required=True)
parser.add_argument('--target_key', help='Specify the name of the processed object.', required=True)
parser.add_argument('--target_bucket', help='Specify the name of the bucket used to store processed object.', required=True)

def main():
    # 解析命令行参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问凭证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 构建音频处理样式字符串以及音频转码处理参数
    style = "audio/convert,ss_10000,t_60000,f_aac,ab_96000"

    # 对目标存储空间名和文件名进行Base64编码,以确保它们可以在URL中安全传输
    target_bucket_base64 = base64.b64encode(args.target_bucket.encode()).decode()
    target_key_base64 = base64.b64encode(args.target_key.encode()).decode()

    # 构建处理指令,包含样式和保存位置
    process = f"{style}|sys/saveas,o_{target_key_base64},b_{target_bucket_base64}"

    # 发送异步请求以处理指定对象并按照给定的样式保存到目标存储空间
    result = client.async_process_object(oss.AsyncProcessObjectRequest(
        bucket=args.bucket,  # 存储空间名
        key=args.key,  # 对象键名
        process=process,  # 处理指令
    ))

    # 打印操作结果的状态码和其他相关信息,以便确认请求状态和处理结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' event id: {result.event_id},'
          f' task id: {result.task_id},'
          f' process request id: {result.process_request_id},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

解析图片盲水印

以下代码展示了如何解析图片中的盲水印。

import base64
import argparse
import alibabacloud_oss_v2 as oss

# 创建命令行参数解析器,并描述脚本用途:异步处理对象(如图像处理)
parser = argparse.ArgumentParser(description="async process object sample")

# 定义命令行参数,包括必需的区域、存储空间名称、endpoint、对象键名以及目标存储空间和图像名称
parser.add_argument('--region', help='The region in which the bucket is located.', required=True)
parser.add_argument('--bucket', help='The name of the bucket.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS')
parser.add_argument('--key', help='The name of the object.', required=True)
parser.add_argument('--target_key', help='Specify the name of the processed object.', required=True)
parser.add_argument('--target_bucket', help='Specify the name of the bucket used to store processed object.', required=True)

def main():
    # 解析命令行参数,获取用户输入的值
    args = parser.parse_args()

    # 从环境变量中加载访问凭证信息,用于身份验证
    credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()

    # 使用SDK默认配置创建配置对象,并设置认证提供者
    cfg = oss.config.load_default()
    cfg.credentials_provider = credentials_provider

    # 设置配置对象的区域属性,根据用户提供的命令行参数
    cfg.region = args.region

    # 如果提供了自定义endpoint,则更新配置对象中的endpoint属性
    if args.endpoint is not None:
        cfg.endpoint = args.endpoint

    # 使用上述配置初始化OSS客户端,准备与OSS交互
    client = oss.Client(cfg)

    # 提取指定图片中的水印内容
    style = "image/deblindwatermark,s_low,t_text"

    # 对目标存储空间名和文件名进行Base64编码,以确保它们可以在URL中安全传输
    target_bucket_base64 = base64.b64encode(args.target_bucket.encode()).decode()
    target_key_base64 = base64.b64encode(args.target_key.encode()).decode()

    # 构建处理指令,包含样式和保存位置
    process = f"{style}|sys/saveas,o_{target_key_base64},b_{target_bucket_base64}"

    # 发送异步请求以处理指定对象并按照给定的样式保存到目标存储空间
    result = client.async_process_object(oss.AsyncProcessObjectRequest(
        bucket=args.bucket,  # 存储空间名
        key=args.key,  # 对象键名
        process=process,  # 处理指令
    ))

    # 打印操作结果的状态码和其他相关信息,以便确认请求状态和处理结果
    print(f'status code: {result.status_code},'
          f' request id: {result.request_id},'
          f' event id: {result.event_id},'
          f' task id: {result.task_id},'
          f' process request id: {result.process_request_id},'
          )

# 当此脚本被直接执行时,调用main函数开始处理逻辑
if __name__ == "__main__":
    main()  # 脚本入口点,控制程序流程从这里开始

相关文档