使用阿里云OSS下载和上传文件

更新时间:
复制为 MD 格式

使用阿里云对象存储OSS能帮助自动化流程处理文件的上传和下载,本文重点介绍如何对接和使用阿里云OSS。

使用场景

适合使用阿里云OSS的场景汇总如下:

操作阿里云OSS

适合的场景

下载文件

场景1:向自动化流程传递需要操作的一批数据,如JSON文件、Excel、CSV

场景2:对数据安全有更高的要求。自动化流程中、流程的输入参数中都不希望包含任何和认证(如账密、AKSK)之类的信息。

场景3:使用OpenAPI接口触发机器人执行时,流程的输入参数内容长度超过接口支持的最大长度,需要有其他方式规避限制。

上传文件

场景4:自动化流程生成了一批数据,需要传递给业务系统。

阿里云OSS支持丰富的权限控制功能(详见 权限与访问控制概述),RPA使用时主要用到 RAM用户的AccessKey预签名URL(详见 使用预签名URL上传文件使用预签名URL下载或预览文件) 两种方式,使用差异如下:

系统集成方式

建议使用场景

RAM用户的AccessKey

  • 自动化流程中需要直接使用AccessKey调用OSS接口。

  • AccessKey相关信息可保存在资产变量中,无需明文保存在流程代码中。

客户端手动触发、定时任务、OpenAPI等方式都适合。

预签名URL

  • 需要您的上层系统通过AccessKey等方式调用OSS接口生成预签名URL,通过OpenAPI、MCP方式调用时以入参形式传给自动化流程,自动化流程内使用该URL完成下载或上传动作。

  • 预签名URL有时效性限制,有效时间的设置需要考虑任务等待执行时间和任务执行时间。

适合通过OpenAPI、MCP方式调用触发,不适合手动触发和定时任务。

安全性最高。

准备阿里云OSS

本章节主要介绍使用阿里云OSS相关的最基本内容,更多功能、安全最佳实践请参考阿里云OSS帮助文档。

  1. 创建bucket,例如命名为mybucket,参考 控制台快速入门

  2. 创建RAM用户,并通过RAM的自定义权限策略,为其增加bucket的操作权限,自定义权限策略如下所示。详见 RAM Policy常见示例

    {
        "Version": "1",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": "oss:*",
                "Resource": [
                    "acs:oss:*:*:mybucket",
                    "acs:oss:*:*:mybucket/*"
                ]
            }
        ]
    }

  3. 更多精细化的权限控制可参考 教程示例:使用RAM Policy控制OSS的访问权限

下载

使用预签名URL方式

各个系统之间的时序图如下所示,本章节主要介绍下图中“1. 生成预签名URL”和“6. 使用预签名URL下载文件”两个过程。

image

  • “1. 生成预签名URL”发生在您的上层系统中,可参考OSS提供的 示例代码

  • “6. 使用预签名URL下载文件”:自动化流程在处理预签名URL时,按照HTTP请求处理即可,示例如下:

    • 编码开发模式

      from rpa.core import *
      from rpa.utils import *
      import rpa4 as rpa 
      import requests
      
      def start():
          # 参数面板中增加oss_presign_ur,用来接收传入的预签名URL
          file_url = rpa.project.params["oss_presign_url"]
          save_path = "c:/test/1.txt"
          try:
              response = requests.get(file_url, stream=True)
              if response.status_code == 200:
                  with open(save_path, 'wb') as f:
                      for chunk in response.iter_content(4096):
                          f.write(chunk)
                  print("Download completed!")
              else:
                  print(f"No file to download. Server replied HTTP code: {response.status_code}")
          except Exception as e:
              print("Error during download:", e)
              
    • 可视化开发模式

      image

使用AccessKey方式

  • 使用编码开发模式下的流程示例如下:

    from rpa.core import *
    from rpa.utils import *
    import rpa4 as rpa 
    # v4.10.0以上版本的RPA已经内置了阿里云OSSSDK,无需再手动添加
    import alibabacloud_oss_v2 as oss
    import os
    
    def get_oss_client():
        #access_key_idaccess_key_secret的值放在资产变量中,运行时自动从资产变量中读取
        access_key_id = rpa.console.asset.get_value('oss_access_key_id')
        access_key_secret = rpa.console.asset.get_value('oss_access_key_secret')
        
        # 使用oss sdk
        credentials_provider = oss.credentials.StaticCredentialsProvider(
            access_key_id=access_key_id,
            access_key_secret=access_key_secret
        )
    
        # 使用SDK的默认配置创建配置对象,并设置认证提供者
        cfg = oss.config.load_default()
        cfg.credentials_provider = credentials_provider
    
        # 设置配置对象的区域属性
        cfg.region = "cn-beijing"
        cfg.endpoint = "https://oss-cn-beijing.aliyuncs.com"
    
        # 使用上述配置初始化OSS客户端,准备与OSS交互
        client = oss.Client(cfg)
        return client
    
    def download(client, bucket, key, save_path):
    
        # 执行获取对象的请求,指定存储空间名称和对象名称
        result = client.get_object(oss.GetObjectRequest(
            bucket=bucket,  # 指定存储空间名称
            key=key,  # 指定对象键名
        ))
    
        # ========== 方式1:完整读取 ==========
        with result.body as body_stream:
            data = body_stream.read()
            with open(save_path, 'wb') as f:
                f.write(data)
    
        # # ========== 方式2:分块读取 ==========
        # with result.body as body_stream:
        #     total_size = 0
        #     with open(save_path, 'wb') as f:
        #         # 使用256KB块大小(可根据需要调整block_size参数)
        #         for chunk in body_stream.iter_bytes(block_size=256 * 1024):
        #             f.write(chunk)
        #             total_size += len(chunk)
        #             print(f"已接收数据块:{len(chunk)} bytes | 累计:{total_size} bytes")
    
    
    
    def start():
        oss_client = get_oss_client()
        save_path = "C:/test/2.txt"
        download(oss_client, "{您的bucket name}", "{该bucket下的key}", save_path)
        pass
  • 流程代码中直接使用OSSPython SDK v2版本,代码示例可参考 简单下载(Python SDK V2)

  • v4.10.0版本以上的RPA已经内置OSSPython SDK,无需再进行导入。

  • 调用OSS所需的access_key_idaccess_key_secret,使用了资产变量功能,因此在代码中无需记录明文内容,运行时自动从服务端获取。

    image

上传

使用预签名URL方式

各个系统之间的时序图如下所示,本章节主要介绍下图中“1. 生成预签名URL”和“6. 使用预签名URL上传文件”两个过程。

image
  • “1. 生成预签名URL”发生在您的上层系统中,可参考OSS提供的 示例代码

  • “6. 使用预签名URL上传文件”:自动化流程在处理预签名URL时,按照HTTP请求处理即可,示例如下:

    • 编码开发模式

      from rpa.core import *
      from rpa.utils import *
      import rpa4 as rpa
      import requests
      
      def start():
          # 参数面板中增加oss_presign_ur,用来接收传入的预签名URL
          signed_url = rpa.project.params["oss_presign_url"]
          file_path = "c:/test/1.txt"
          try:
              # 打开文件
              with open(file_path, 'rb') as file:
                  # 发送PUT请求上传文件
                  response = requests.put(signed_url, data=file)
      
              print(f"返回上传状态码:{response.status_code}")
              if response.status_code == 200:
                  print("使用网络库上传成功")
              print(response.text)
      
          except Exception as e:
              print(f"发生错误:{e}")
          pass
    • 可视化开发模式

      • 流程如下图所示。增加了两个变量 v_signed_url 和 v_file_path,分别用于保存流程的入参(预签名URL)和需要上传的本地文件地址

        image

      • 使用“调用自定义脚本”方式,将 v_file_path 对应的本地文件上传到OSS中。自定义脚本内容如下:

        import requests
        with open(v_file_path, 'rb') as file:
          response = requests.put(v_signed_url, data=file)

使用AccessKey方式

  • 使用编码开发模式下的流程示例如下:

    from rpa.core import *
    from rpa.utils import *
    import rpa4 as rpa 
    # v4.10.0以上版本的RPA已经内置了阿里云OSSSDK,无需再手动添加
    import alibabacloud_oss_v2 as oss
    import os
    
    def get_oss_client():
        #access_key_idaccess_key_secret的值放在资产变量中,运行时自动从资产变量中读取
        access_key_id = rpa.console.asset.get_value("oss_access_key_id")
        access_key_secret = rpa.console.asset.get_value("oss_access_key_secret")
        
        credentials_provider = oss.credentials.StaticCredentialsProvider(
            access_key_id=access_key_id,
            access_key_secret=access_key_secret
        )
    
        # 使用SDK的默认配置创建配置对象,并设置认证提供者
        cfg = oss.config.load_default()
        cfg.credentials_provider = credentials_provider
    
        # 设置配置对象的区域属性
        cfg.region = "cn-beijing"
        cfg.endpoint = "https://oss-cn-beijing.aliyuncs.com"
    
        # 使用上述配置初始化OSS客户端,准备与OSS交互
        client = oss.Client(cfg)
        return client
    
    def upload(client, bucket, key, file_path):
        with open(file_path, 'rb') as f:
            result = client.put_object(
                oss.PutObjectRequest(
                    bucket=bucket,  # 存储空间名称
                    key=key,        # 对象名称
                    body=f.read()   # 读取文件内容
                )
            )
    
    def start():
        oss_client = get_oss_client()
        file_path = "C:/test/1.txt"
        upload(oss_client, "{您的bucket name}", "{该bucket下的key}", file_path)
        pass
  • 流程代码中直接使用OSSPython SDK v2版本,代码示例可参考 简单上传(Python SDK V2)

  • v4.10.0版本以上的RPA已经内置OSSPython SDK,无需再进行导入。

  • 调用OSS所需的access_key_idaccess_key_secret,使用了资产变量功能,因此在代码中无需记录明文内容,运行时自动从服务端获取。

    image