在本地环境使用PyODPS

更新时间:2025-04-25 06:49:23

PyODPS是阿里云开发的Python SDK,方便开发者通过代码使用MaxCompute进行大数据处理和分析。本文将介绍如何在本地环境上使用PyODPS进行表操作、数据加载和运行SQL查询。

前提条件

  • 本地已安装PyODPS,并设置环境变量

  • 已准备数据集pyodps_iris。您可参考使用示例下载数据集、创建pyodps_iris表并写入数据,文档中的操作示例即以此表作为示例数据,为您演示基本操作。

主要流程

  1. 打开Python编辑器,并创建一个Python文件。

    说明

    若本地未安装Python编辑器,可直接创建一个后缀名为.py的文件。

  2. 开发PyODPS任务代码。

    创建完成后,您可参考下文内容进行简单示例的操作学习,了解PyODPS的主要能力维度。

    更多PyODPS的使用指导请参见基本操作概述DataFrame。您也可以参考示例文档:使用PyODPS节点进行结巴中文分词,进行一个端到端的简单操作。

  3. 本地运行python文件。

初始化ODPS入口

您需要手动定义ODPS入口,代码示例如下。

import os
from odps import ODPS

o = ODPS(
    # (推荐)确保已设置环境变量。
    # 确保ALIBABA_CLOUD_ACCESS_KEY_ID环境变量设置为用户 Access Key ID。
    access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    # 确保ALIBABA_CLOUD_ACCESS_KEY_SECRET环境变量设置为用户Access Key Secret。
    secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    
    # (不推荐)若未设置环境变量,请使用如下格式替代。获取地址:https://ram.console.aliyun.com/profile/access-keys
    # access_id='your-aliyun-access-key-id',
    # secret_access_key= 'your-aliyun-access-key-secret',
    
    project='your-default-project',
    endpoint='your-end-point',
)

参数

填写说明

参数

填写说明

your-default-project

填写您的MaxCompute项目名称,登录MaxCompute控制台获取项目列表。

your-end-point

填写您所在地域和所在网络环境下的Endpoint

例如杭州地域的公网Endpoint,请填写https://service.cn-hangzhou.maxcompute.aliyun.com/api

重要

请确保选择正确的网络类型,否则将无法正常连接。

完成上述配置后,您就可以在本地环境中使用PyODPS,例如对于ODPS对象的基本操作listgetexistcreatedelete等。更多PyODPS的使用指导请参见基本操作概述DataFrame

执行SQL

您可以在本地PyODPS节点中执行SQL命令,并读取SQL运行结果。

PyODPS节点中执行SQL命令

您可以在PyODPS节点中使用传统模式或查询加速(MCQA)执行SQL命令,当前主要支持运行DDL、DML类型的SQL命令。与传统模式相比,加速查询模式(MCQA)会将作业的运行结果写入临时缓存中。当您后续执行相同的查询作业时,MaxCompute会优先返回缓存中的结果,加快执行速度,相关计费规则请参见计算费用(按量付费),您可根据需要选择相应的模式执行。

说明

可以执行的SQL语句并非都可以通过入口对象的execute_sql()run_sql()等方法执行。在调用非DDL或非DML语句时,请使用其他方法。例如,调用创建表语句时,请使用create_table方法;调用API命令时,请使用run_xflowexecute_xflow方法。

使用传统模式执行SQL命令
使用加速查询模式(MCQA)执行SQL命令

您可以使用execute_sql()/run_sql()来执行SQL命令,示例如下:

使用create_table方法创建一个新的表。

o.create_table('my_t', 'num bigint, id string', if_not_exists=True)

使用execute_sql方法执行SQL查询。

result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3')
with result.open_reader() as reader:
    for record in reader:
        print(record)

MCQAMaxCompute提供的查询加速功能,支持使用独立资源池对中小规模数据进行加速。PyODPSV0.11.4.1版本开始支持以下列方式通过MCQA执行SQL。

execute_sql_interactive(fallback='all'):通过MCQA执行SQL并返回MCQA Instance。如果MCQA无法执行相应的SQL,会自动回退到传统模式。此时,函数返回的Instance为回退后的Instance。

o.execute_sql_interactive('SELECT * FROM dual', fallback='all')

如果不希望回退,可以指定参数fallback=False。也可以指定为回退策略(或回退策略的组合,使用逗号分隔的字符串)。可用的策略名如下,默认策略为unsupported,upgrading,noresource,timeout。为使回退一直生效,建议设置为all,即generic,unsupported,upgrading,noresource,timeout的组合。

回退策略

描述

generic

指定时,表示发生未知错误时回退到传统模式。

noresource

指定时,表示发生资源不足问题时回退到传统模式。

upgrading

指定时,表示升级期间回退到传统模式。

timeout

指定时,表示执行超时时回退到传统模式。

unsupported

指定时,表示遇到MCQA不支持的场景时回退到传统模式。

例如,下述代码表示通过MCQA执行SQL,如果遇到不支持MCQA的场景或资源不足问题,回退到传统模式。

 o.execute_sql_interactive('SELECT * FROM dual', fallback="noresource,unsupported")

PyODPS节点中读取SQL运行结果

您可以使用open_reader()来读取SQL命令运行结果。具体操作请参见读取SQL执行结果

更多PyODPS节点的SQL相关操作详情请参见SQL

DataFrame

PyODPS提供了DataFrame API,支持您使用DataFrame进行数据处理,更多DataFrame的操作示例请参见DataFrame

  • 执行

    DataFrame的执行需要显式调用立即执行的方法(如executepersist等)。示例代码如下。

    # 调用立即执行的方法,处理每条Record,打印出表pyodps_iris中iris.sepalwidth小于3的所有数据。
    from odps.df import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    for record in iris[iris.sepalwidth < 3].execute():  
        print(record)
    
  • 打印详细信息

    默认情况下,本地环境的PyODPS节点运行过程不会打印Logview等详细过程。您可以手动设置options.verbose选项,开启打印Logview等详细过程。

    from odps import options
    options.verbose = True
    

设置运行参数hints

运行任务时如果需要设置运行时参数,可以通过设置hints参数来实现,参数类型是dict。

o.execute_sql('SELECT * FROM pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

您也可以对全局设置sql.setting,设置后后续每次运行时都会添加相关的运行时参数。

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
# 会根据全局配置添加hints
o.execute_sql('SELECT * FROM pyodps_iris')

完整示例

  1. 本地创建test-pyodps-local.py文件。

  2. 写入示例代码。

    import os
    from odps import ODPS
    
    o = ODPS(
        # (推荐)确保已设置环境变量。
        # 确保ALIBABA_CLOUD_ACCESS_KEY_ID环境变量设置为用户 Access Key ID。
        access_id=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        # 确保ALIBABA_CLOUD_ACCESS_KEY_SECRET环境变量设置为用户Access Key Secret。
        secret_access_key=os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    # 以直接指定字段名以及字段类型的方式创建非分区表my_new_table。
    table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
    
    # 向非分区表my_new_table中插入数据。
    records = [[111, 'aaa'],
               [222, 'bbb'],
               [333, 'ccc'],
               [444, '中文']]
    o.write_table(table, records)
    
    # 读取非分区表my_new_table中的数据。
    for record in o.read_table(table):
        print(record[0], record[1])
    
    # 以运行SQL的方式读取表中的数据。
    result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3;', hints={'odps.sql.allow.fullscan': 'true'})
    
    # 删除表以清除资源。
    table.drop()
    
    print('使用open_reader方式读取pyodps_iris表数据:')
    
    # 读取SQL执行结果。
    with result.open_reader() as reader:
        for record in reader:
            print(record[0], record[1])
    
  3. 运行python代码。

    python test-pyodps-local.py

    运行结果:

    111 aaa
    222 bbb
    333 ccc
    444 中文
    使用open_reader方式读取pyodps_iris表数据:
    4.9 3.0
    4.7 3.2
    4.6 3.1
  • 本页导读 (1)
  • 前提条件
  • 主要流程
  • 初始化ODPS入口
  • 执行SQL
  • 在PyODPS节点中执行SQL命令
  • 在PyODPS节点中读取SQL运行结果
  • DataFrame
  • 设置运行参数hints
  • 完整示例
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等