文档

在本地环境上使用PyODPS

更新时间:

PyODPS是阿里云开发的Python SDK,用于在本地环境上使用MaxCompute服务。本文将介绍如何在本地环境上使用PyODPS进行表操作、数据加载和运行SQL查询。

前提条件

  1. 本地已安装python环境,且已安装PyODPS包,参考:安装PyODPS

  2. 准备数据集pyodps_iris。您可参考使用示例下载数据集、创建pyodps_iris表并写入数据,文档中的操作示例即以此表作为示例数据,为您演示基本操作。

主要流程

  1. 打开Python编辑器,并创建一个Python文件。

  2. 开发PyODPS任务代码。

    创建完成后,您可参考下文内容进行简单示例的操作学习,了解PyODPS的主要能力维度。

    更多PyODPS的使用指导请参见基本操作概述DataFrame概述。您也可以参考示例文档:使用PyODPS节点进行结巴中文分词,进行一个端到端的简单操作。

  3. 本地运行python文件。

创建ODPS入口

您需要手动定义ODPS入口,代码示例如下。

import os
from odps import ODPS
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)

其中:

  • ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET:需将该环境变量设置为您的阿里云账号的AccessKey ID和 AccessKey Secret。

    说明

    不建议直接使用AccessKey ID和 AccessKey Secret字符串。

  • your-default-projectyour-end-point:需替换为待访问的MaxCompute项目名称与对应地域的Endpoint信息,各地域的Endpoint请参见Endpoint

执行SQL

您可以在本地PyODPS节点中执行SQL命令,并读取SQL运行结果。

在PyODPS节点中执行SQL命令

您可以在PyODPS节点中使用传统模式或加速查询模式(MCQA)执行SQL命令,当前主要支持运行DDL、DML类型的SQL命令。与传统模式相比,加速查询模式(MCQA)会将作业的运行结果写入临时缓存中。当您后续执行相同的查询作业时,MaxCompute会优先返回缓存中的结果,加快执行速度,相关计费规则请参见计算费用(按量付费)您可根据需要选择相应的模式执行。

说明

可以执行的SQL语句并非都可以通过入口对象的execute_sql()run_sql()等方法执行。在调用非DDL或非DML语句时,请使用其他方法。例如,调用创建表语句时,请使用create_table方法;调用API命令时,请使用run_xflowexecute_xflow方法。

  • 使用传统模式执行SQL命令。

    您可以使用execute_sql()/run_sql()来执行SQL命令,示例如下:

    使用create_table方法创建一个新的表。

    o.create_table('my_t', 'num bigint, id string', if_not_exists=True)

    使用execute_sql方法执行SQL查询。

    result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3')
    with result.open_reader() as reader:
        for record in reader:
            print(record)
    
  • 使用加速查询模式(MCQA)执行SQL命令。

    MCQA是MaxCompute提供的查询加速功能,支持使用独立资源池对中小规模数据进行加速。PyODPS从V0.11.4.1版本开始支持以下列方式通过MCQA执行SQL。

    execute_sql_interactive(fallback='all'):通过MCQA执行SQL并返回MCQA Instance。如果MCQA无法执行相应的SQL,会自动回退到传统模式。此时,函数返回的Instance为回退后的Instance。

    o.execute_sql_interactive('SELECT * FROM dual', fallback='all')

    如果不希望回退,可以指定参数fallback=False。也可以指定为回退策略(或回退策略的组合,使用逗号分隔的字符串)。可用的策略名如下,默认策略为unsupported,upgrading,noresource,timeout。为使回退一直生效,建议设置为all,即generic,unsupported,upgrading,noresource,timeout的组合。

    回退策略

    描述

    generic

    指定时,表示发生未知错误时回退到传统模式。

    noresource

    指定时,表示发生资源不足问题时回退到传统模式。

    upgrading

    指定时,表示升级期间回退到传统模式。

    timeout

    指定时,表示执行超时时回退到传统模式。

    unsupported

    指定时,表示遇到MCQA不支持的场景时回退到传统模式。

    例如,下述代码表示通过MCQA执行SQL,如果遇到不支持MCQA的场景或资源不足问题,回退到传统模式。

     o.execute_sql_interactive('SELECT * FROM dual', fallback="noresource,unsupported")

在PyODPS节点中读取SQL运行结果

您可以使用open_reader()来读取SQL命令运行结果。具体操作请参见读取SQL执行结果

更多PyODPS节点的SQL相关操作详情请参见SQL

DataFrame

  • 执行

    DataFrame的执行需要显式调用立即执行的方法(如executepersist等)。示例代码如下。

    # 调用立即执行的方法,处理每条Record,打印出表pyodps_iris中iris.sepalwidth小于3的所有数据。
    from odps.df import DataFrame
    iris = DataFrame(o.get_table('pyodps_iris'))
    for record in iris[iris.sepalwidth < 3].execute():  
        print(record)
    
  • 打印详细信息

    默认情况下,本地环境的PyODPS节点运行过程不会打印Logview等详细过程。您可以手动设置options.verbose选项,开启打印Logview等详细过程。

    from odps import options
    options.verbose = True
    

更多DataFrame的操作示例请参见DataFrame概述

设置运行参数hints

运行任务时如果需要设置运行时参数,可以通过设置hints参数来实现,参数类型是dict。

o.execute_sql('SELECT * FROM pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

您也可以对全局设置sql.setting,设置后后续每次运行时都会添加相关的运行时参数。

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
# 会根据全局配置添加hints
o.execute_sql('SELECT * FROM pyodps_iris')

完整示例

  1. 本地创建test-pyodps-local.py文件。

  2. 写入示例代码。

    import os
    from odps import ODPS
    
    # 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
    # ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
    o = ODPS(
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
        os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
        project='your-default-project',
        endpoint='your-end-point',
    )
    
    # 以直接指定字段名以及字段类型的方式创建非分区表my_new_table。
    table = o.create_table('my_new_table', 'num bigint, id string', if_not_exists=True)
    
    # 向非分区表my_new_table中插入数据。
    records = [[111, 'aaa'],
               [222, 'bbb'],
               [333, 'ccc'],
               [444, '中文']]
    o.write_table(table, records)
    
    # 读取非分区表my_new_table中的数据。
    for record in o.read_table(table):
        print(record[0], record[1])
    
    # 以运行SQL的方式读取表中的数据。
    result = o.execute_sql('SELECT * FROM pyodps_iris LIMIT 3;', hints={'odps.sql.allow.fullscan': 'true'})
    
    # 删除表以清除资源。
    table.drop()
    
    print('使用open_reader方式读取pyodps_iris表数据:')
    
    # 读取SQL执行结果。
    with result.open_reader() as reader:
        for record in reader:
            print(record[0], record[1])
    
  3. 运行python代码。

    python test-pyodps-local.py

    运行结果:

    111 aaa
    222 bbb
    333 ccc
    444 中文
    使用open_reader方式读取pyodps_iris表数据:
    4.9 3.0
    4.7 3.2
    4.6 3.1