Python SDK示例:SQL

本文为您介绍Python SDK中执行SQL命令相关的典型场景操作示例。

注意事项

PyODPS支持MaxCompute SQL查询,并可以读取执行的结果,使用时有以下注意事项。

  • 入口对象的execute_sql('statement')run_sql('statement')方法可以执行SQL语句,返回值是运行实例,详情请参见任务实例

  • 目前暂不支持使用Arrow格式读取Instance结果。

  • 并非所有可以执行的MaxCompute命令都是PyODPS可以接受的SQL语句。在调用非DDL、DML语句时,请使用其他方法,例如:

    • GRANT、REVOKE等语句请使用run_security_query方法。

    • PAI命令请使用run_xflow或execute_xflow方法。

  • 调用SQL引擎执行SQL,会按照SQL作业进行计费,计费详情请参见计费项与计费方式概述

执行SQL

import os
from odps import ODPS
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
o = ODPS(
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
    os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
    project='your-default-project',
    endpoint='your-end-point',
)
o.execute_sql('select * from table_name')  #  同步的方式执行,会阻塞直到SQL执行完成。
instance = o.run_sql('select * from table_name')  # 以异步的方式提交
print(instance.get_logview_address())  # 获取logview地址
instance.wait_for_success()  # 阻塞直到完成

设置运行参数

在运行时如果需要设置参数,您可以通过设置hints参数来实现,参数的类型是dict。

o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})

您可以对于全局配置设置sql.settings,后续每次运行时则都会添加相关的运行时参数。

from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris')  # 会根据全局配置添加hints

读取SQL执行结果

运行SQL的Instance能够直接执行open_reader操作读取SQL执行结果。读取时会出现以下两种情况:

  • SQL返回了结构化的数据。

    with o.execute_sql('select * from table_name').open_reader() as reader:
        for record in reader:
            print(record)  # 处理每一个record
  • SQL可能执行了desc命令,这时可以通过reader.raw取到原始的SQL执行结果。

    with o.execute_sql('desc table_name').open_reader() as reader:
        print(reader.raw)

设置使用哪种结果接口

如果您设置了options.tunnel.use_instance_tunnel == True,在后续调用open_reader时,PyODPS会默认调用Instance Tunnel, 否则会调用旧的Result接口。如果您使用了版本较低的 MaxCompute服务,或者调用Instance Tunnel出现了问题,PyODPS会给出告警并自动降级到旧的Result接口,您可根据告警信息判断导致降级的原因。如果Instance Tunnel的返回结果不合预期, 您可以将该选项设为False,在调用open_reader时,也可以使用tunnel参数来指定使用何种结果接口。

  • 使用Instance Tunnel

    with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
        for record in reader:
            print(record)  # 处理每一个record
  • 使用 Results 接口

    with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader:
        for record in reader:
            print(record)  # 处理每一个record

设置读取数据规模

如果您想要限制下载数据的规模,可以为open_reader增加limit选项, 或者设置 options.tunnel.limit_instance_tunnel = True 。如果未设置 options.tunnel.limit_instance_tunnel,MaxCompute会自动打开数据量限制,此时,可下载的数据条数受到Project配置的Tunnel下载数据规模数限制, 通常该限制为10000条。

设置读取结果为pandas DataFrame

# 直接使用 reader 的 to_pandas 方法
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
    # pd_df 类型为 pandas DataFrame
    pd_df = reader.to_pandas()

设置读取速度(进程数)

说明

多进程加速仅在 PyODPS 0.11.3 及以上版本中支持。

您可以通过n_process指定使用进程数。

import multiprocessing
n_process = multiprocessing.cpu_count()
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
    # n_process 指定成机器核数
    pd_df = reader.to_pandas(n_process=n_process)

设置alias

在运行SQL时,如果某个UDF引用的资源是动态变化的,您可以alias旧的资源名到新的资源,这样就可以避免重新删除并重新创建UDF。

from odps.models import Schema

myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    schema=Schema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# 写入一行数据,只包含一个值1
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把内容为1的资源alias成内容为2的资源,不需要修改UDF或资源
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])

在交互式环境执行 SQL

在ipython和jupyter里支持使用SQL插件的方式运行SQL,且支持参数化查询, 详情参考交互体验增强文档

设置biz_id

在少数情形下,在提交SQL时,可能需要同时提交biz_id,否则执行会报错。此时,您可以在全局options里设置biz_id。

from odps import options

options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')