全部产品
云市场

PyODPS开发常见问题

更新时间:2019-07-31 11:43:47

为了能让用户更高效地开发PyODPS程序,本文总结了使用PyODPS过程中的最佳实践。

注意:公共云现已支持Python UDF,因此本文中提到的自定义函数功能,如apply 和map_reduce等,公共云用户均可使用。

Q:调用Dataframe的head方法报错IndexError:listindexoutofrange如何处理?

A:出现这种报错通常是由于List为空,没有元素或list[index]超出范围。在dynamic处理流程中,针对不确定的字段才会做dynamic处理。如果您有privot操作,需要persist一下再处理。

Q:如何避免嵌套循环执行慢的情况?

A:建议您通过Dict数据结构记录下循环的执行结果,最后在循环外统一导入到Dateframe对象中。如果您将Dateframe对象代码df=XXX放置在外层循环中,会导致每次循环计算都生成一个Dateframe对象,从而降低嵌套循环整体的执行速度。

Q:什么情况下可以拉取PyODPS数据到本地处理?

A:PyODPS提供了多种方便拉取数据到本地的操作,但是这种操作无法发挥MaxCompute的大规模并行能力。以下两种情况下可以拉取pyodps数据到本地:

  • 数据量很小的情况下才能进行本地数据处理。
  • 如果您需要对单行数据应用一个Python函数,或者做一行变多行的操作,这时使用PyODPS DataFrame就可以轻松完成,并且完全发挥了MaxCompute的并行计算能力。

    比如,现在有一份数据都是json串,现在要把json串按key-value对展开成一行。则可以写一个简单的函数。

  1. In [12]: df
  2. json
  3. 0 {"a": 1, "b": 2}
  4. 1 {"c": 4, "b": 3}
  5. In [14]: from odps.df import output
  6. In [16]: @output(['k', 'v'], ['string', 'int'])
  7. ...: def h(row):
  8. ...: import json
  9. ...: for k, v in json.loads(row.json).items():
  10. ...: yield k, v
  11. ...:
  12. In [21]: df.apply(h, axis=1)
  13. k v
  14. 0 a 1
  15. 1 b 2
  16. 2 c 4
  17. 3 b 3

上述操作,使用apply(axis=1)和map_reduce接口便可完成。

Q:如何使用pandas计算后端进行高效本地debug?

A:您可以利用以下两种方式来进行本地debug。这两种方式除了初始化不同,后续代码完全一致。

  • PyODPS DataFrame能够根据数据来源来决定如何执行。比如,通过pandas DataFrame创建的PyODPS DataFrame可以使用pandas执行本地计算。
  • 使用MaxCompute表创建的DataFrame可以在MaxCompute上执行。

示例代码:

  1. df = o.get_table('movielens_ratings').to_df()
  2. DEBUG = True
  3. if DEBUG:
  4. df = df[:100].to_pandas(wrap=True)

to_pandas是将数据下载,根据wrap参数来决定是否返回PyODPS DataFrame,如果是True,则返回PyODPS DataFrame;否则,返回pandas DataFrame。

当您把所有后续代码都编写完成,本地的测试速度就会非常快。当测试结束后,您可以把debug改为False,这样后续就能在ODPS上执行全量的计算。

使用本地调试还有个好处,就是能利用到IDE的如断点和单步调试自定义函数的功能。要知道,在ODPS上执行,是把函数序列化到远端去执行,所以本地是没法断点进入的。而使用本地进行调试时,则可以断点进入自定义函数,方便进行调试。

推荐您使用MaxCompute studio来执行本地PyODPS程序调试。

Q:如何利用Python语言特性来实现丰富的功能?

A:

  • 编写Python函数

举一个常见的例子,如计算两点之间的距离,有多种计算方法,比如欧氏距离、曼哈顿距离等,您可以定义一系列函数,在计算时根据具体情况调用相应的函数即可。

  1. def euclidean_distance(from_x, from_y, to_x, to_y):
  2. return ((from_x - to_x) ** 2 + (from_y - to_y) ** 2).sqrt()
  3. def manhattan_distance(center_x, center_y, x, y):
  4. return (from_x - to_x).abs() + (from_y - to_y).abs()

调用如下:

  1. In [42]: df
  2. from_x from_y to_x to_y
  3. 0 0.393094 0.427736 0.463035 0.105007
  4. 1 0.629571 0.364047 0.972390 0.081533
  5. 2 0.460626 0.530383 0.443177 0.706774
  6. 3 0.647776 0.192169 0.244621 0.447979
  7. 4 0.846044 0.153819 0.873813 0.257627
  8. 5 0.702269 0.363977 0.440960 0.639756
  9. 6 0.596976 0.978124 0.669283 0.936233
  10. 7 0.376831 0.461660 0.707208 0.216863
  11. 8 0.632239 0.519418 0.881574 0.972641
  12. 9 0.071466 0.294414 0.012949 0.368514
  13. In [43]: euclidean_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance')
  14. distance
  15. 0 0.330221
  16. 1 0.444229
  17. 2 0.177253
  18. 3 0.477465
  19. 4 0.107458
  20. 5 0.379916
  21. 6 0.083565
  22. 7 0.411187
  23. 8 0.517280
  24. 9 0.094420
  25. In [44]: manhattan_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance')
  26. distance
  27. 0 0.392670
  28. 1 0.625334
  29. 2 0.193841
  30. 3 0.658966
  31. 4 0.131577
  32. 5 0.537088
  33. 6 0.114198
  34. 7 0.575175
  35. 8 0.702558
  36. 9 0.132617
  • 利用Python语言的条件和循环语句

一个常见的需求是,用户大概有30张表,需要合成一张表,这时候如果写SQL,则需要写union all 30张表,如果表的数量更多,则更加让人头疼。如果使用PyODPS,只需要一句话就可以完成。

  1. table_names = ['table1', ..., 'tableN']
  2. dfs = [o.get_table(tn).to_df() for tn in table_names]
  3. reduce(lambda x, y: x.union(y), dfs)

这里的reduce这句等价于:

  1. df = dfs[0]
  2. for other_df in dfs[1:]:
  3. df = df.union(other_df)

稍微扩展下,经常有一些case如,用户要计算的表保存在数据库,需要根据配置来对表的字段进行处理,然后对所有表进行union或者join操作。这个时候如果用SQL实现可能是相当复杂的,但是用DataFrame处理则会非常简单,而实际上就有用户使用PyODPS解决了这样的问题。

Q:为什么尽量使用内建算子,而不是自定义函数?

A:上文提到的欧氏距离的计算,实际上,计算的过程都是使用的DataFrame的内建算子,如指数和sqrt等操作。如果您对一行数据应用自定义函数,则会发现,速度会慢很多。

  1. In [54]: euclidean_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance').mean()
  2. |==========================================| 1 / 1 (100.00%) 7s
  3. 0.5216082314224464
  4. In [55]: @output(['distance'], ['float'])
  5. ...: def euclidean_distance2(row):
  6. ...: import math
  7. ...: return math.sqrt((row.from_x - row.to_x) ** 2 + (row.from_y - row.to_y) ** 2)
  8. ...:
  9. In [56]: df.apply(euclidean_distance2, axis=1, reduce=True).mean()
  10. |==========================================| 1 / 1 (100.00%) 27s
  11. 0.5216082314224464

可以看到,当对一行应用了自定义函数后,执行时间从7秒延长到了27秒,这个数据只是一百万行数据计算的结果,如果有更大的数据集,更复杂的操作,时间的差距可能会更大。

Q:输入ODPS表记录数大概是六百万,PyODPS MR将ODPS表转成DataFrame输出的df count只有一万。怎样解决PyODPS MR输出数据1万的限制?

代码如下:

  1. def get_table(pt):
  2. df = DataFrame(odps.get_table(table_name).get_partition('pt = pt'))
  3. return df
  4. def map1(row):
  5. yield row.col1,row.col2,row.col3
  6. def reducer1(keys):
  7. def p(row, done):
  8. yield keys[0],row[1]
  9. return p
  10. df = get_table(pt)
  11. df = df.map_reduce(map1,reducer,group=col1,map1_output_names=[col1,col2,col3],
  12. map1_output_types=['string','string','string'],reducer_output_names=[col1,col2],
  13. reducer_output_types=['string','string'])

输出结果如下图所示:cfcount

A:PyODPS MR将ODPS上的表转成DataFrame排序输出时,由于ODPS要求排序必须指定个数,所以在ODPS后端执行时,会通过options.df.odps.sort.limit 指定排序个数,这个值默认是10000,如果要排序尽量多的数据,可以把这个值设置为较大的值。不过注意,此时可能会导致OOM。

关于ODPS sort详细的设置操作请参见PyODPS排序

总结

利用PyODPS,用户能挖掘更多灵活和高效地操作MaxCompute数据的方式。最佳实践可以不光是我们提供的一些建议,如果您有更多好玩又有用的实践,也可以多多分享出来。