PyODPS支持对MaxCompute SQL的基本操作,本文为您介绍如何在PyODPS中使用SQL。

背景信息

PyODPS提供对MaxCompute SQL的基本操作方法,方法如下所示。

方法名称

方法说明

execute_sql()/run_sql()

执行SQL语句

open_reader()

读取SQL执行结果

说明

在MaxCompute客户端中可以执行的SQL语句并非都可以通过入口对象的execute_sql()run_sql()方法执行。在调用非DDL或非DML语句时,请使用其他方法。例如,调用GRANT或REVOKE语句时,请使用run_security_query方法;调用API命令时,请使用run_xflowexecute_xflow方法。

在Python UDF编写过程中,如果某个UDF引用的资源是动态变化的,您可以在execute_sql()中设置alias给旧的资源一个别名作为新的资源,无需重新删除或创建新的UDF。详情请参见设置alias

执行SQL语句

PyODPS对MaxCompute SQL操作的具体说明如下。

  • 参数说明

    • statement:需要执行的SQL语句。

    • hints:设置运行时参数,参数类型是DICT。

  • 返回值说明

    执行execute_sql()run_sql()后的返回值是任务实例。详情请参见任务实例

  • 使用示例

    • 示例1

      执行SQL语句。

      o.execute_sql('select * from table_name')  #同步的方式执行,会阻塞直到SQL语句执行完成。
      instance = o.run_sql('select * from table_name')  #异步的方式执行。
      print(instance.get_logview_address())  # 获取Logview地址。
      instance.wait_for_success()  # 阻塞直到完成。
    • 示例2

      执行SQL语句时,运行参数。

      o.execute_sql('select * from pyodps_iris', hints={'odps.stage.mapper.split.size': 16})

      您也可以通过如下示例,设置sql.settings,对运行的参数进行全局配置,则在每次运行语句时都会执行对应参数,支持设置的全局参数请参见Flag参数列表

      from odps import options
      options.sql.settings = {'odps.stage.mapper.split.size': 16}
      o.execute_sql('select * from pyodps_iris')  # 会根据全局配置添加hints。

读取SQL执行结果

您可以通过open_reader操作读取SQL执行结果。有以下两种情况:

  • 读取表数据,返回结构化数据,通过for语句遍历即可。

    with o.execute_sql('select * from table_name').open_reader() as reader:
        for record in reader:   # 处理每一个record。
            print(record)
  • 执行desc等命令,返回非结构化数据,需要通过reader.raw获取执行结果。

    with o.execute_sql('desc table_name').open_reader() as reader:
        print(reader.raw)

在调用open_reader()时,PyODPS会默认调用旧的Result接口,可能会出现获取数据超时或获取数据受限等问题。您可以按照如下方法指定PyODPS调用Instance Tunnel。

  • 在脚本中设置options.tunnel.use_instance_tunnel =True

  • 按照如下示例,设置open_reader(tunnel=True)。从PyODPS v0.7.7.1开始,您可以通过open_reader()方法读取全量数据。

    with o.execute_sql('select * from table_name').open_reader(tunnel=True) as reader:
        for record in reader:
            print(record)
说明
  • 如果您使用了较低版本的MaxCompute服务,或者调用Instance Tunnel出现了问题,PyODPS会给出警告并自动降级到旧的Result接口,您可根据警告信息判断导致降级的原因。

  • 如果您使用的MaxCompute只能支持旧Result接口,并且需要读取所有的数据,您可将SQL结果写入另一张表后用读表接口读取(可能受到Project安全设置的限制)。

  • 更多Instance Tunnel说明,请参见Instance tunnel

PyODPS默认不限制从Instance读取的数据规模,但Project Owner可能在MaxCompute Project上增加保护设置,以限制对Instance结果的读取,此时只能使用受限读取模式读取数据,在此模式下可读取的行数受到Project配置限制,通常为10000行。如果PyODPS检测到读取Instance数据被限制,且options.tunnel.limit_instance_tunnel未设置,会自动启用受限读取模式。

  • 如果您的Project被保护,想要手动启用受限读取模式,可以为open_reader()方法增加limit=True参数,例如open_reader(limit=True)。或者设置options.tunnel.limit_instance_tunnel = True

  • 在部分环境中(例如DataWorks),options.tunnel.limit_instance_tunnel可能默认被置为True,此时,如果想要读取所有数据,需要为open_reader()方法增加tunnel=Truelimit=False参数,例如open_reader(tunnel=True, limit=False)

  • 重要

    如果Project本身被保护,tunnel=Truelimit=False选项不能解除保护,此时应联系Project Owner开放相应的读权限。

设置alias

如果某个UDF引用的资源是动态变化的,您可以通过设置alias给旧的资源一个别名作为新的资源,无需重新删除或创建新的UDF。

from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->bigint')
class Example(object):
    def __init__(self):
        self.n = int(get_cache_file('test_alias_res1').read())

    def evaluate(self, arg):
        return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
                  class_type='test_alias.Example',
                  resources=['test_alias.py', 'test_alias_res1'])

table = o.create_table(
    'test_table',
    schema=Schema.from_lists(['size'], ['bigint']),
    if_not_exists=True
)

data = [[1, ], ]
# 写入一行数据,只包含一个值1。
o.write_table(table, 0, [table.new_record(it) for it in data])

with o.execute_sql(
    'select test_alias_func(size) from test_table').open_reader() as reader:
    print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把内容为1的资源别名设置成内容为2的资源。您不需要修改UDF或资源。
with o.execute_sql(
    'select test_alias_func(size) from test_table',
    aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
    print(reader[0][0])

在少数情形下,提交SQL时需要同时提交biz_id,否则执行会报错。此时,您可以通过设置全局options里的biz_id解决此类报错。

from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')