开放存储SDK示例-Python SDK

MaxCompute支持第三方引擎(如Spark on EMR、StarRocks、Presto、PAI和Hologres)通过SDK调用Storage API直接访问MaxCompute数据,本文为您介绍使用Python SDK访问MaxCompute的代码示例。

MaxCompute提供了开放存储相关接口,详情请参见aliyun-odps-python-sdk

前提条件

本文示例代码是基于PyODPS,若您是在本地环境执行以下代码,请确保已安装PyODPS。具体操作,请参见安装PyODPS

同时PyODPS还支持在DataWorks、PAI Notebooks中使用,其中:

  • DataWorks的PyODPS节点已安装好了PyODPS,您可以直接在DataWorks的PyODPS节点上开发PyODPS任务并周期性运行,操作指导请参见通过DataWorks使用PyODPS

  • PAI的Python环境也可安装运行PyODPS,其中PAI的内置镜像均已安装好了PyODPS可直接使用,如PAI-Designer的自定义Python组件,在PAI Notebooks中使用PyODPS的方式与通用的使用方式基本一致,可参考基本操作概述DataFrame概述

说明

PyODPS是MaxCompute的Python版本的SDK,关于PyODPS详情,请参见PyODPS概述

使用示例

使用Python SDK访问MaxCompute的代码示例,详情请参见Python SDK Examples

  1. 配置连接MaxCompute服务的环境

    import os
    from odps import ODPS
    from odps.apis.storage_api import *
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    # 不建议直接使用 Access Key ID / Access Key Secret 字符串
    # endpoint为MaxCompute服务的连接地址,当前仅支持使用阿里云VPC网络
    o = ODPS(
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    		os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    		project='your-default-project',
    		endpoint='your-end-point'
    )
    #MaxCompute表名称
    table = "<table to access>"
    # 访问MaxCompute使用的Quota名称
    quota_name = "<quota name>"
    # 连接并访问阿里云MaxCompute服务并创建基于Arrow格式的Storage API对象
    def get_arrow_client():
        odps_table = o.get_table(table)
        client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name)
    
        return client
    
    
    说明

    获取独享数据传输服务资源组(包年包月)和开放存储(按量计费)两种资源的Quota名称的方式分别如下:

    • 独享数据传输服务资源组:登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>配额(Quota)管理,查看可使用的Quota列表。具体操作,请参见计算资源-Quota管理

    • 开放存储:登录MaxCompute控制台,在左侧导航栏选择租户管理>租户属性,开启开放存储,开放存储资源名称默认为pay-as-you-go

  2. 读表操作。

    1. 创建数据读取会话,读取MaxCompute数据。

      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # 定义函数create_read_session,mode参数用于指定扫描数据时所使用的分片策略,若mode为size,按数据大小进行分片,若为row,按行数进行分片
      def create_read_session(mode):
          client = get_arrow_client()
          req = TableBatchScanRequest(required_partitions=['pt=test_write_1'])
      
          if mode == "size":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE)
          elif mode == "row":
              req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET)
      
          resp = client.create_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Create read session failed")
              return
      
          logger.info("Read session id: " + resp.session_id)
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide split mode: size|row")
      
          mode = sys.argv[1]
          if mode != "row" and mode != "size":
              raise ValueError("Please provide split mode: size|row")
      
          create_read_session(mode)
      
      
    2. 创建监控和检查数据读取状态的会话。

      import logging
      import sys
      import time
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      # 确保在执行数据读取操作前,确认read session它已经成功创建并处于准备就绪的状态
      def check_session_status(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK:
              logger.info("Get read session failed")
              return
      
          # session创建过程可能时间较长,需要等待session status为NORMAL才可以读数据
          if resp.session_status == SessionStatus.NORMAL:
              logger.info("Read session id: " + resp.session_id)
          else:
              logger.info("Session status is not expected")
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          check_session_status(session_id)
      
      
    3. 读取MaxCompute数据。

      # 通过指定的session_id从MaxCompute中读取数据行,并统计总共读取的数据行数
      import logging
      import sys
      from odps.apis.storage_api import *
      from util import *
      
      logger = logging.getLogger(__name__)
      
      def read_rows(session_id):
          client = get_arrow_client()
          req = SessionRequest(session_id=session_id)
          resp = client.get_read_session(req)
      
          if resp.status != Status.OK and resp.status != Status.WAIT:
              logger.info("Get read session failed")
              return
      
          if resp.split_count == -1:
              req.row_index = 0
              req.row_count = resp.record_count
          else:
              req.split_index = 0
      
          req = ReadRowsRequest(session_id=session_id)
          reader = client.read_rows_arrow(req)
          total_line = 0
          while True:
              record_batch = reader.read()
              if record_batch is None:
                  break
              total_line += record_batch.num_rows
      
          if reader.get_status() != Status.OK:
              logger.info("Read rows failed")
              return
      
          logger.info("Total line is:" + str(total_line))
      
      if __name__ == '__main__':
          logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO)
          if len(sys.argv) != 2:
              raise ValueError("Please provide session id")
      
          session_id = sys.argv[1]
          read_rows(session_id)
      
      

相关文档

关于MaxCompute开放存储详情,请参见开放存储概述