本文为您介绍Python SDK中执行SQL命令相关的典型场景操作示例。
注意事项
PyODPS支持MaxCompute SQL查询,并可以读取执行的结果,使用时有以下注意事项。
入口对象的
execute_sql('statement')
和run_sql('statement')
方法可以执行SQL语句,返回值是运行实例,详情请参见任务实例。目前暂不支持使用Arrow格式读取Instance结果。
并非所有可以执行的MaxCompute命令都是PyODPS可以接受的SQL语句。在调用非DDL、DML语句时,请使用其他方法,例如:
GRANT、REVOKE等语句请使用run_security_query方法。
PAI命令请使用run_xflow或execute_xflow方法。
调用SQL引擎执行SQL,会按照SQL作业进行计费,计费详情请参见计费项与计费方式概述。
执行SQL
import os
from odps import ODPS
# 确保 ALIBABA_CLOUD_ACCESS_KEY_ID 环境变量设置为用户 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 环境变量设置为用户 Access Key Secret,
# 不建议直接使用 Access Key ID / Access Key Secret 字符串
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
o.execute_sql('select * from table_name') # 同步的方式执行,会阻塞直到SQL执行完成。
instance = o.run_sql('select * from table_name') # 以异步的方式提交
print(instance.get_logview_address()) # 获取logview地址
instance.wait_for_success() # 阻塞直到完成
设置运行参数
在运行时如果需要设置参数,您可以通过设置hints参数来实现,参数的类型是dict。
o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
您可以对于全局配置设置sql.settings,后续每次运行时则都会添加相关的运行时参数。
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # 会根据全局配置添加hints
读取SQL执行结果
运行SQL的Instance能够直接执行open_reader
操作读取SQL执行结果。读取时会出现以下两种情况:
SQL返回了结构化的数据。
with o.execute_sql('select * from table_name').open_reader() as reader: for record in reader: print(record) # 处理每一个record
SQL可能执行了
desc
命令,这时可以通过reader.raw
取到原始的SQL执行结果。with o.execute_sql('desc table_name').open_reader() as reader: print(reader.raw)
设置使用哪种结果接口
如果您设置了options.tunnel.use_instance_tunnel == True
,在后续调用open_reader
时,PyODPS会默认调用Instance Tunnel, 否则会调用旧的Result接口。如果您使用了版本较低的 MaxCompute服务,或者调用Instance Tunnel出现了问题,PyODPS会给出告警并自动降级到旧的Result接口,您可根据告警信息判断导致降级的原因。如果Instance Tunnel的返回结果不合预期, 您可以将该选项设为False,在调用open_reader
时,也可以使用tunnel
参数来指定使用何种结果接口。
使用Instance Tunnel
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader: for record in reader: print(record) # 处理每一个record
使用 Results 接口
with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader: for record in reader: print(record) # 处理每一个record
设置读取数据规模
如果您想要限制下载数据的规模,可以为open_reader
增加limit选项, 或者设置 options.tunnel.limit_instance_tunnel = True
。如果未设置 options.tunnel.limit_instance_tunnel
,MaxCompute会自动打开数据量限制,此时,可下载的数据条数受到Project配置的Tunnel下载数据规模数限制, 通常该限制为10000条。
设置读取结果为pandas DataFrame
# 直接使用 reader 的 to_pandas 方法
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
# pd_df 类型为 pandas DataFrame
pd_df = reader.to_pandas()
设置读取速度(进程数)
多进程加速仅在 PyODPS 0.11.3 及以上版本中支持。
您可以通过n_process
指定使用进程数。
import multiprocessing
n_process = multiprocessing.cpu_count()
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
# n_process 指定成机器核数
pd_df = reader.to_pandas(n_process=n_process)
设置alias
在运行SQL时,如果某个UDF引用的资源是动态变化的,您可以alias
旧的资源名到新的资源,这样就可以避免重新删除并重新创建UDF。
from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
schema=Schema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# 写入一行数据,只包含一个值1
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把内容为1的资源alias成内容为2的资源,不需要修改UDF或资源
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
在交互式环境执行 SQL
在ipython和jupyter里支持使用SQL插件的方式运行SQL,且支持参数化查询, 详情参考交互体验增强文档。
设置biz_id
在少数情形下,在提交SQL时,可能需要同时提交biz_id,否则执行会报错。此时,您可以在全局options里设置biz_id。
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')