MaxCompute支持第三方引擎(如Spark on EMR、StarRocks、Presto、PAI和Hologres)通过SDK调用Storage API直接访问MaxCompute数据,本文为您介绍使用Python SDK访问MaxCompute的代码示例。
MaxCompute提供了开放存储相关接口,详情请参见aliyun-odps-python-sdk。
前提条件
本文示例代码是基于PyODPS,若您是在本地环境执行以下代码,请确保已安装PyODPS。具体操作,请参见安装PyODPS。
同时PyODPS还支持在DataWorks、PAI Notebooks中使用,其中:
DataWorks的PyODPS节点已安装好了PyODPS,您可以直接在DataWorks的PyODPS节点上开发PyODPS任务并周期性运行,操作指导请参见通过DataWorks使用PyODPS。
PAI的Python环境也可安装运行PyODPS,其中PAI的内置镜像均已安装好了PyODPS可直接使用,如PAI-Designer的自定义Python组件,在PAI Notebooks中使用PyODPS的方式与通用的使用方式基本一致,可参考基本操作概述、DataFrame概述。
PyODPS是MaxCompute的Python版本的SDK,关于PyODPS详情,请参见PyODPS概述。
使用示例
使用Python SDK访问MaxCompute的代码示例,详情请参见Python SDK Examples。
配置连接MaxCompute服务的环境。
import os from odps import ODPS from odps.apis.storage_api import * # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret, # 不建议直接使用 Access Key ID / Access Key Secret 字符串 # endpoint为MaxCompute服务的连接地址,当前仅支持使用阿里云VPC网络 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='your-default-project', endpoint='your-end-point' ) #MaxCompute表名称 table = "<table to access>" # 访问MaxCompute使用的Quota名称 quota_name = "<quota name>" # 连接并访问阿里云MaxCompute服务并创建基于Arrow格式的Storage API对象 def get_arrow_client(): odps_table = o.get_table(table) client = StorageApiArrowClient(odps=o, table=odps_table, quota_name=quota_name) return client
说明获取独享数据传输服务资源组(包年包月)和开放存储(按量计费)两种资源的Quota名称的方式分别如下:
独享数据传输服务资源组:登录MaxCompute控制台,左上角切换地域后,在左侧导航栏选择工作区>配额(Quota)管理,查看可使用的Quota列表。具体操作,请参见计算资源-Quota管理。
开放存储:登录MaxCompute控制台,在左侧导航栏选择租户管理>租户属性,开启开放存储,开放存储资源名称默认为
pay-as-you-go
。
读表操作。
创建数据读取会话,读取MaxCompute数据。
import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # 定义函数create_read_session,mode参数用于指定扫描数据时所使用的分片策略,若mode为size,按数据大小进行分片,若为row,按行数进行分片 def create_read_session(mode): client = get_arrow_client() req = TableBatchScanRequest(required_partitions=['pt=test_write_1']) if mode == "size": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.SIZE) elif mode == "row": req.split_options = SplitOptions.get_default_options(SplitOptions.SplitMode.ROW_OFFSET) resp = client.create_read_session(req) if resp.status != Status.OK: logger.info("Create read session failed") return logger.info("Read session id: " + resp.session_id) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide split mode: size|row") mode = sys.argv[1] if mode != "row" and mode != "size": raise ValueError("Please provide split mode: size|row") create_read_session(mode)
创建监控和检查数据读取状态的会话。
import logging import sys import time from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) # 确保在执行数据读取操作前,确认read session它已经成功创建并处于准备就绪的状态 def check_session_status(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK: logger.info("Get read session failed") return # session创建过程可能时间较长,需要等待session status为NORMAL才可以读数据 if resp.session_status == SessionStatus.NORMAL: logger.info("Read session id: " + resp.session_id) else: logger.info("Session status is not expected") if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] check_session_status(session_id)
读取MaxCompute数据。
# 通过指定的session_id从MaxCompute中读取数据行,并统计总共读取的数据行数 import logging import sys from odps.apis.storage_api import * from util import * logger = logging.getLogger(__name__) def read_rows(session_id): client = get_arrow_client() req = SessionRequest(session_id=session_id) resp = client.get_read_session(req) if resp.status != Status.OK and resp.status != Status.WAIT: logger.info("Get read session failed") return if resp.split_count == -1: req.row_index = 0 req.row_count = resp.record_count else: req.split_index = 0 req = ReadRowsRequest(session_id=session_id) reader = client.read_rows_arrow(req) total_line = 0 while True: record_batch = reader.read() if record_batch is None: break total_line += record_batch.num_rows if reader.get_status() != Status.OK: logger.info("Read rows failed") return logger.info("Total line is:" + str(total_line)) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) if len(sys.argv) != 2: raise ValueError("Please provide session id") session_id = sys.argv[1] read_rows(session_id)
相关文档
关于MaxCompute开放存储详情,请参见开放存储概述。