PyODPS常见问题

本文为您介绍使用PyODPS时的常见问题。

问题类别

常见问题

安装PyODPS

导入模块

使用PyODPS

安装PyODPS时,提示Warning: XXX not installed,如何解决?

产生此问题的原因为组件缺失,请根据报错信息中提示的XXX信息,明确缺失的组件名称,使用pip命令安装此组件。

安装PyODPS时,提示Project Not Found,如何解决?

产生此问题的原因为:

  • Endpoint配置错误,需要修改为目标Project的Endpoint。更多Endpoint信息,请参见Endpoint

  • MaxCompute入口对象参数位置填写错误。请检查此项确保其填写正确。更多MaxCompute入口对象参数信息,请参见从DataWorks迁移到本地环境

安装PyODPS时,报错Syntax Error,如何解决?

由于Python版本过低导致报错。不支持Python2.5及以下版本,建议使用PyODPS支持的主流版本,例如Python2.7.6+、Python3.3+以及Python2.6。

在Mac上安装PyODPS时,报错Permission Denied,如何解决?

您可以使用sudo pip install pyodps命令安装PyODPS。

在Mac上安装PyODPS时,报错Operation Not Permitted,如何解决?

此报错是由系统完整性保护导致。您需要重启设备,并在重启中按+R键,此后在终端中运行如下命令可以解决此问题。

csrutil disable
reboot       

更多信息,请参见Operation Not Permitted when on root - El Capitan (rootless disabled)

执行from odps import ODPS时,报错No Module Named ODPS,如何解决?

此报错说明无法加载ODPS Package。无法加载的原因有如下几种:

  • 原因一:安装了多个Python版本。

    解决措施:Search Path(通常是当前目录)中包含odps.pyinit.py文件且名为odps的文件夹。解决方法如下:

    • 如果是文件夹重名,请修改文件夹名称。

    • 如果是曾经安装过一个名为odps的Python包,请使用sudo pip uninstall odps进行删除。

  • 原因二:同时安装了Python2和Python3版本。

    解决措施:确保设备只安装了Python2或Python3版本。

  • 原因三:当前使用的Python下并未安装PyODPS。

    解决措施:安装PyODPS,安装方法请参见安装PyODPS

执行from odps import ODPS时,报错Cannot Import Name ODPS,如何解决?

请检查当前工作路径下是否存在名为odps.py的文件。若存在,请改名后再执行导入操作。

执行from odps import ODPS时,报错Cannot Import Module odps,如何解决?

此报错通常是由于PyODPS遇到了依赖问题。您可以单击申请链接添加PyODPS技术支持钉钉群,联系钉群管理员定位解决。

在IPython或Jupyter下使用PyODPS时,报错ImportError,如何解决?

在代码头部增加from odps import errors

如果增加from odps import errors也报错则是缺少Ipython组件,执行sudo pip install -U jupyter解决此问题。

o.gettable('table_name').size中size字段的含义是什么?

SIZE字段表示表的物理存储大小。

如何设置Tunnel Endpoint?

您可以通过options.tunnel.endpoint设置,请参见aliyun-odps-python-sdk

PyODPS如何使用包含CPython的第三方包?

建议您打包成WHEEL格式后使用,请参见如何制作可以在MaxCompute上使用的crcmod

PyODPS中的DataFrame最多可以处理多少数据,对表的大小有限制吗?

PyODPS对表的大小没有限制。本地Pandas创建DataFrame的大小受限于本地内存的大小。

在DataFrame中如何使用max_pt?

使用odps.df.func模块来调用MaxCompute内建函数。

from odps.df import func
df = o.get_table('your_table').to_df()
df[df.ds == func.max_pt('your_project.your_table')]  # ds是分区字段。     

使用PyODPS向表写入数据的两种方式open_writer()和write_table()有什么区别?

每次调用write_table(),MaxCompute都会在服务端生成一个文件。这一操作需要较大的时间开销,同时过多的文件会降低后续的查询效率,还可能造成服务端内存不足。因此,建议在使用write_table()方法时,一次性写入多组数据或者传入一个Generator对象。使用write_table()方法示例请参见写入表数据

open_writer()默认写入到Block中。

为什么DataWorks PyODPS节点上查出的数据量要少于本地运行的结果?

DataWorks上默认未开启Instance Tunnel,即instance.open_reader默认使用Result接口,最多可以获取一万条记录。

开启Instance Tunnel后,您可以通过reader.count获取记录数。如果您需要迭代获取全部数据,则需要通过设置options.tunnel.limit_instance_tunnel = False关闭Limit限制。

DataFrame如何获得Count实际数字?

  1. 安装PyODPS后,在Python环境下执行如下命令创建MaxCompute表来初始化DataFrame。

    iris = DataFrame(o.get_table('pyodps_iris'))        
  2. 在DataFrame上执行Count获取DataFrame的总行数。

    iris.count()      
  3. 由于DataFrame上的操作并不会立即执行,只有当用户显式调用Execute方法或者立即执行的方法时,才会真正执行。此时为了防止Count方法延迟执行,可输入如下命令。

    df.count().execute()    

获取DataFrame实际数量的相关方法请参见聚合操作。详细的PyODPS方法延迟操作,请参见执行

使用PyODPS时,报错sourceIP is not in the white list,如何解决?

PyODPS访问的MaxCompute项目存在白名单保护,请联系项目所有者将设备添加至IP白名单。更多IP白名单信息,请参见管理IP白名单

使用from odps import options options.sql.settings设置MaxCompute运行环境不成功,如何解决?

  • 问题现象

    使用PyODPS运行SQL,在申请MaxCompute实例前,通过如下代码设置MaxCompute运行环境。

    from odps import options
    options.sql.settings = {'odps.sql.mapper.split.size': 32}     

    运行任务后只启动了6个Mapper,设置未生效。 在客户端执行set odps.stage.mapper.split.size=32,一分钟运行完毕。

  • 产生原因

    客户端和PyODPS里设置的参数不一致。客户端的参数是odps.stage.mapper.split.size,而PyODPS里的参数是 odps.sql.mapper.split.size

  • 解决措施

    修改参数为odps.stage.mapper.split.size

调用DataFrame的head方法时,报错IndexError:listindexoutofrange,是什么原因?

由于list[index]没有元素或list[index]超出范围。

上传Pandas DataFrame至MaxCompute时,报错ODPSError,如何解决?

  • 问题现象

    上传Pandas DataFrame至MaxCompute时,返回报错如下。

    ODPSError: ODPS entrance should be provided.
  • 产生原因

    报错原因为没有找到全局的MaxCompute对象入口。

  • 解决措施

    • 使用Room机制%enter时,会配置全局入口。

    • 对MaxCompute对象入口调用to_global方法。

    • 使用参数DataFrame(pd_df).persist('your_table', odps=odps)

通过DataFrame写表时,报错lifecycle is not specified in mandatory mode,如何解决?

  • 问题现象

    通过DataFrame写表时,返回报错如下。

    table lifecycle is not specified in mandatory mode
  • 产生原因

    未给表设置生命周期。

  • 解决措施

    Project要求对每张表设置生命周期,因此需要在每次执行时设置如下信息即可。

    from odps import options
    options.lifecycle = 7  # 此处设置lifecycle的值。lifecycle取值为整数,单位为天。      

使用PyODPS写数据时,提示Perhaps the datastream from server is crushed,如何解决?

该报错是由脏数据导致,请您检查数据列数是否和目标表一致。

使用PyODPS读数据时,报错Project is protected,如何解决?

Project上的安全策略禁止读取表中的数据,如果想使用全部数据,可以使用以下方法:

  • 联系Project Owner增加例外规则。

  • 使用DataWorks或其他脱敏工具先对数据进行脱敏,导出到非保护Project,再进行读取。

如果只想查看部分数据,可使用如下方法:

  • 改用o.execute_sql('select * from <table_name>').open_reader()

  • 改用DataFrame,o.get_table('<table_name>').to_df()

PyODPS脚本任务不定时出现连接失败,报错ConnectionError: timed out try catch exception,如何解决?

产生此报错的可能原因如下:

  • 建立连接超时。PyODPS默认的超时时间是5s,解决方法如下:

    • 您可以在代码头部加上如下代码,增加超时时间隔。

      workaround from odps import options 
      options.connect_timeout=30                        
    • 捕获异常,进行重试。

  • 由于沙箱限制,会造成部分机器禁止网络访问。建议您使用独享调度资源组执行任务,解决此问题。

使用PyODPS运行get_sql_task_cost函数时,报错is not defined,如何解决?

  • 问题现象

    使用PyODPS运行get_sql_task_cost函数时,返回报错如下。

    NameError: name 'get_task_cost' is not defined.
  • 解决措施

    函数名称有误。

  • 解决措施

    使用execute_sql_cost替代get_sql_task_cost。

使用PyODPS打印日志时,中文自动转换为编码显示,如何显示成原始中文?

您可以使用类似print ("我叫 %s" % ('abc'))的输入方式解决。目前仅Python 2涉及此类问题。

设置options.tunnel.use_instance_tunnel = False,为什么字段在MaxCompute中定义为DATETIME类型,使用SELECT语句得到的数据为STRING类型?

在调用Open_Reader时,PyODPS会默认调用旧的Result接口。此时从服务端得到的数据是CSV格式的,所以DATETIME都是STRING类型。

打开Instance Tunnel,即设置options.tunnel.use_instance_tunnel = True,PyODPS会默认调用Instance Tunnel,即可解决此问题。

如何利用Python语言特性来实现丰富的功能?

  • 编写Python函数。

    计算两点之间的距离有多种计算方法,例如欧氏距离、曼哈顿距离等,您可以定义一系列函数,在计算时根据具体情况调用相应的函数即可。

    def euclidean_distance(from_x, from_y, to_x, to_y):
        return ((from_x - to_x) ** 2 + (from_y - to_y) ** 2).sqrt()
    
    def manhattan_distance(center_x, center_y, x, y):
       return (from_x - to_x).abs() + (from_y - to_y).abs()                      

    调用如下。

    In [42]: df
         from_x    from_y      to_x      to_y
    0  0.393094  0.427736  0.463035  0.105007
    1  0.629571  0.364047  0.972390  0.081533
    2  0.460626  0.530383  0.443177  0.706774
    3  0.647776  0.192169  0.244621  0.447979
    4  0.846044  0.153819  0.873813  0.257627
    5  0.702269  0.363977  0.440960  0.639756
    6  0.596976  0.978124  0.669283  0.936233
    7  0.376831  0.461660  0.707208  0.216863
    8  0.632239  0.519418  0.881574  0.972641
    9  0.071466  0.294414  0.012949  0.368514
    
    In [43]: euclidean_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance')
       distance
    0  0.330221
    1  0.444229
    2  0.177253
    3  0.477465
    4  0.107458
    5  0.379916
    6  0.083565
    7  0.411187
    8  0.517280
    9  0.094420
    
    In [44]: manhattan_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance')
       distance
    0  0.392670
    1  0.625334
    2  0.193841
    3  0.658966
    4  0.131577
    5  0.537088
    6  0.114198
    7  0.575175
    8  0.702558
    9  0.132617                       
  • 利用Python语言的条件和循环语句。

    如果用户要计算的表保存在数据库,需要根据配置来对表的字段进行处理,然后对所有表进行UNION或者JOIN操作。这时如果用SQL实现是相当复杂的,但是用DataFrame处理则会非常简单。

    例如,您有30张表需要合成一张表,此时如果使用SQL,则需要对30张表执行UNION ALL操作。如果使用PyODPS,如下代码就可以完成。

    table_names = ['table1', ..., 'tableN']
    dfs = [o.get_table(tn).to_df() for tn in table_names]
    reduce(lambda x, y: x.union(y), dfs) 
    
    ## reduce语句等价于如下代码。
    df = dfs[0]
    for other_df in dfs[1:]:
        df = df.union(other_df)       

如何使用Pandas计算后端进行本地Debug?

您可以通过如下两种方式来进行本地Debug。这两种方式除了初始化方法不同,后续代码完全一致:

  • 通过Pandas DataFrame创建的PyODPS DataFrame可以使用Pandas执行本地计算。

  • 使用MaxCompute表创建的DataFrame可以在MaxCompute上执行。

示例代码如下。

df = o.get_table('movielens_ratings').to_df()
DEBUG = True
if DEBUG:
    df = df[:100].to_pandas(wrap=True)       

当所有后续代码都编写完成,本地的测试速度非常快。当测试结束后,您可以把Debug值改为False,这样后续就能在MaxCompute上执行全量的计算。

推荐您使用MaxCompute Studio来执行本地PyODPS程序调试。

如何避免嵌套循环执行慢的情况?

建议您通过Dict数据结构记录下循环的执行结果,最后在循环外统一导入到Dataframe对象中。如果您将Dataframe对象代码df=XXX放置在外层循环中,会导致每次循环计算都生成一个Dataframe对象,从而降低嵌套循环整体的执行速度。

如何避免将数据下载到本地?

请参见PyODPS节点实现避免将数据下载到本地

什么情况下可以下载PyODPS数据到本地处理?

在如下两种情况下,可以下载PyODPS数据到本地:

  • 数据量很小的情况下进行本地数据处理。

  • 如果需要对单行数据应用一个Python函数,或者执行一行变多行的操作,这时使用PyODPS DataFrame就可以轻松完成,并且可以完全发挥MaxCompute的并行计算能力。

    例如有一份JSON串数据,需要把JSON串按Key-Value对展开成一行,代码如下所示。

    In [12]: df
                   json
    0  {"a": 1, "b": 2}
    1  {"c": 4, "b": 3}
    
    In [14]: from odps.df import output
    
    In [16]: @output(['k', 'v'], ['string', 'int'])
        ...: def h(row):
        ...:     import json
        ...:     for k, v in json.loads(row.json).items():
        ...:         yield k, v
        ...:   
    
    In [21]: df.apply(h, axis=1)
       k  v
    0  a  1
    1  b  2
    2  c  4
    3  b  3                          

通过open_reader最多只能取到1万条记录,如何获取多于1万条的记录?

使用create table as select ...将SQL的结果保存成表,再使用table.open_reader读取。

为什么尽量使用内建算子,而不是自定义函数?

计算过程中使用自定义函数比使用内建算子速度慢很多,因此建议使用内建算子。

对于百万行的数据,当一行应用了自定义函数后,执行时间从7秒延长到了27秒。如果有更大的数据集、更复杂的操作,时间的差距可能会更大。

为什么通过DataFrame().schema.partitions获得分区表的分区值为空?

这是因为DataFrame不区分分区字段和普通字段,所以获取分区表的分区字段作为普通字段处理。您可以通过如下方式过滤掉分区字段。

df = o.get_table().to_df()
df[df.ds == '']       

建议您参照来设置分区或读取分区信息。

如何使用PyODPS DataFrame执行笛卡尔积?

请参见PyODPS DataFrame处理笛卡尔积的方式

如何使用PyODPS实现结巴中文分词?

请参见PyODPS节点实现结巴中文分词

如何使用PyODPS下载全量数据?

PyODPS默认不限制从Instance读取的数据规模。但是对于受保护的MaxCompute项目,通过Tunnel下载数据将受限。此时,如果未设置options.tunnel.limit_instance_tunnel,则数据量限制会被自动打开,可下载的数据条数受到MaxCompute配置限制,通常该限制为10000条。如果您需要迭代获取全部数据,则需要关闭limit限制。您可以通过下列语句在全局范围内打开Instance Tunnel并关闭limit限制。

options.tunnel.use_instance_tunnel = True
options.tunnel.limit_instance_tunnel = False  # 关闭limit限制,读取全部数据。

with instance.open_reader() as reader:
    # 通过Instance Tunnel可读取全部数据。

使用PyODPS统计表中某个字段的空值率时,是用execute_sql还是DataFrame?

DataFrame聚合性能更高一些,推荐使用DataFrame来执行聚合操作。

PyODPS数据类型如何设置?

如果您使用PyODPS,可以通过下列方法打开新数据类型开关:

  • 如果通过execute_sql方式打开新数据类型,可以执行o.execute_sql('set odps.sql.type.system.odps2=true;query_sql', hints={"odps.sql.submit.mode" : "script"})

  • 如果通过Dataframe打开新数据类型,例如persist、execute、to_pandas等方法,可通过hints参数设置。图示设置方法仅针对单个作业生效。

    from odps.df import DataFrame
    users - DataFrame(o.get_table('odps2_test'))
    users.persist('copy_test',hints={'odps.sql.type.system.odps2':'true'})

    如果通过Dataframe调用,且需要全局生效,需要使用Option参数options.sql.use_odps2_extension = True

使用PyODPS时遇到ValueError,如何解决?

您可以通过以下两种方式进行解决:

  • 升级SDK版本至V0.8.4或以上版本。

  • SQL中添加如下语句:

    fromodps.typesimportDecimal
    Decimal._max_precision=38