本文向您介绍DataFrame操作支持的执行方法。

延迟执行

DataFrame上的所有操作并不会立即执行,只有当用户显式调用execute方法,或者一些立即执行的方法时(内部调用的就是execute),才会真正去执行。

这些立即执行的方法包括:

方法 说明 返回值
persist 将执行结果保存到ODPS表 PyODPS DataFrame
execute 执行并返回全部结果 ResultFrame
head 查看开头N行数据,这个方法会执行所有结果,并取开头N行数据 ResultFrame
tail 查看结尾N行数据,这个方法会执行所有结果,并取结尾N行数据 ResultFrame
to_pandas 转化为Pandas DataFrame或者Series,wrap 参数为True的时候,返回PyODPS DataFrame对象 wrap为True返回PyODPS DataFrame,False(默认)返回pandas DataFrame
plot,hist,boxplot 画图有关
说明 在交互式环境下,PyODPS DataFrame会在打印或者repr的时候,调用 execute方法,这样省去了用户手动去调用execute。
iris[iris.sepallength < 5][:5]
   sepallength  sepalwidth  petallength  petalwidth         name
0          4.9         3.0          1.4         0.2  Iris-setosa
1          4.7         3.2          1.3         0.2  Iris-setosa
2          4.6         3.1          1.5         0.2  Iris-setosa
3          4.6         3.4          1.4         0.3  Iris-setosa
4          4.4         2.9          1.4         0.2  Iris-setosa

如果想关闭自动调用执行,则需要手动设置

 from odps import options
 options.interactive = False

 iris[iris.sepallength < 5][:5]
Collection: ref_0
  odps.Table
    name: odps_test_sqltask_finance.`pyodps_iris`
    schema:
      sepallength           : double
      sepalwidth            : double
      petallength           : double
      petalwidth            : double
      name                  : string

Collection: ref_1
  Filter[collection]
    collection: ref_0
    predicate:
      Less[sequence(boolean)]
        sepallength = Column[sequence(float64)] 'sepallength' from collection ref_0
        Scalar[int8]
          5

Slice[collection]
  collection: ref_1
  stop:
    Scalar[int8]
      5

此时打印或者repr对象,会显示整棵抽象语法树。

说明 ResultFrame是结果集合,不能参与后续计算。

ResultFrame可以迭代取出每条记录:

 result = iris.head(3)
 for r in result:
     print(list(r))
[5.0999999999999996, 3.5, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.9000000000000004, 3.0, 1.3999999999999999, 0.20000000000000001, u'Iris-setosa']
[4.7000000000000002, 3.2000000000000002, 1.3, 0.20000000000000001, u'Iris-setosa']

ResultFrame也支持在安装有pandas的前提下转换为pandas DataFrame或使用pandas后端的PyODPS DataFrame:

 pd_df = iris.head(3).to_pandas()  # 返回pandas DataFrame
 wrapped_df = iris.head(3).to_pandas(wrap=True)  # 返回使用Pandas后端的PyODPS DataFrame

保存执行结果为ODPS表

对Collection,我们可以调用persist方法,参数为表名。返回一个新的DataFrame对象

iris2 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris2')
 iris2.head(5)
   sepallength  sepalwidth  petallength  petalwidth             name
0          4.5         2.3          1.3         0.3      Iris-setosa
1          5.5         2.3          4.0         1.3  Iris-versicolor
2          4.9         2.4          3.3         1.0  Iris-versicolor
3          5.0         2.0          3.5         1.0  Iris-versicolor
4          6.0         2.2          4.0         1.0  Iris-versicolor

persist可以传入partitions参数,这样会创建一个表,它的分区是partitions所指定的字段。

 iris3 = iris[iris.sepalwidth < 2.5].persist('pyodps_iris3', partitions=['name'])
 iris3.data
odps.Table
  name: odps_test_sqltask_finance.`pyodps_iris3`
  schema:
    sepallength           : double
    sepalwidth            : double
    petallength           : double
    petalwidth            : double
  partitions:
    name                  : string

如果想写入已经存在的表的某个分区,persist可以传入partition参数,指明写入表的哪个分区(如ds=******)。 这时要注意,该DataFrame的每个字段都必须在该表存在,且类型相同。drop_partition和create_partition参数只有在此时有效, 分别表示是否要删除(如果分区存在)或创建(如果分区不存在)该分区。

 iris[iris.sepalwidth < 2.5].persist('pyodps_iris4', partition='ds=test', drop_partition=True, create_partition=True)

写入表时,还可以指定表的生命周期,如下列语句将表的生命周期指定为10天:

iris[iris.sepalwidth < 2.5].persist('pyodps_iris5', lifecycle=10)

如果数据源中没有ODPS对象,例如数据源仅为Pandas,在persist时需要手动指定ODPS入口对象, 或者将需要的入口对象标明为全局对象,如:

 # 假设入口对象为 o
 # 指定入口对象
 df.persist('table_name', odps=o)
 # 或者可将入口对象标记为全局
 o.to_global()
 df.persist('table_name')

保存执行结果为Pandas DataFrame

我们可以使用to_pandas方法,如果wrap参数为True,将返回PyODPS DataFrame对象。

 type(iris[iris.sepalwidth < 2.5].to_pandas())
pandas.core.frame.DataFrame
type(iris[iris.sepalwidth < 2.5].to_pandas(wrap=True))
odps.df.core.DataFrame

中的数据读出,PyODPS可以执行open_reader方法,通过reader.to_pandas()转成Pandas DataFrame。

立即运行设置运行参数

对于立即执行的方法,比如executepersistto_pandas等,可以设置运行时参数(仅对ODPS SQL后端有效)。

一种方法是设置全局参数。详细参考SQL设置运行参数

也可以在这些立即执行的方法上,使用hints参数。这样,这些参数只会作用于当前的计算过程。

iris[iris.sepallength < 5].to_pandas(hints={'odps.sql.mapper.split.size': 16})

运行时显示详细信息

有时,用户需要查看运行时instance的logview时,需要修改全局配置:

from odps import options
 options.verbose = True

 iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
Sql compiled:
SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name`
FROM odps_test_sqltask_finance.`pyodps_iris` t1
WHERE t1.`sepallength` < 5
LIMIT 5
logview:
http://logview

   sepalwidth  petallength  petalwidth         name
0         3.0          1.4         0.2  Iris-setosa
1         3.2          1.3         0.2  Iris-setosa
2         3.1          1.5         0.2  Iris-setosa
3         3.4          1.4         0.3  Iris-setosa
4         2.9          1.4         0.2  Iris-setosa

用户可以指定自己的日志记录函数,比如像这样:

 my_logs = []
 def my_logger(x):
     my_logs.append(x)

 options.verbose_log = my_logger

 iris[iris.sepallength < 5].exclude('sepallength')[:5].execute()
   sepalwidth  petallength  petalwidth         name
0         3.0          1.4         0.2  Iris-setosa
1         3.2          1.3         0.2  Iris-setosa
2         3.1          1.5         0.2  Iris-setosa
3         3.4          1.4         0.3  Iris-setosa
4         2.9          1.4         0.2  Iris-setosa

 print(my_logs)
['Sql compiled:', 'SELECT t1.`sepalwidth`, t1.`petallength`, t1.`petalwidth`, t1.`name` \nFROM odps_test_sqltask_finance.`pyodps_iris` t1 \nWHERE t1.`sepallength` < 5 \nLIMIT 5', 'logview:', u'http://logview']

缓存中间Collection计算结果

DataFrame的计算过程中,一些Collection被多处使用,或者用户需要查看中间过程的执行结果, 这时用户可以使用 cache标记某个collection需要被优先计算。

说明 值得注意的是, cache延迟执行,调用cache不会触发立即计算。
 cached = iris[iris.sepalwidth < 3.5].cache()
 df = cached['sepallength', 'name'].head(3)
 df
   sepallength         name
0          4.9  Iris-setosa
1          4.7  Iris-setosa
2          4.6  Iris-setosa
 cached.head(3)  # 由于cached已经被计算,所以能立刻取到计算结果
   sepallength         name
0          4.9  Iris-setosa
1          4.7  Iris-setosa
2          4.6  Iris-setosa

异步和并行执行

DataFrame支持异步操作,对于立即执行的方法,包括 executepersistheadtailto_pandas(其他方法不支持), 传入async参数,即可以将一个操作异步执行,timeout参数指定超时时间, 异步返回的是 Future对象。

 from odps.df import Delay
 delay = Delay()  # 创建Delay对象

 df = iris[iris.sepal_width < 5].cache()  # 有一个共同的依赖
 future1 = df.sepal_width.sum().execute(delay=delay)  # 立即返回future对象,此时并没有执行
 future2 = df.sepal_width.mean().execute(delay=delay)
 future3 = df.sepal_length.max().execute(delay=delay)
 delay.execute(n_parallel=3)  # 并发度是3,此时才真正执行。
|==========================================|   1 /  1  (100.00%)        21s
 future1.result()
 458.10000000000014
 future2.result()
3.0540000000000007

可以看到上面的例子里,共同依赖的对象会先执行,然后再以并发度为3分别执行future1到future3。 当n_parallel为1时,执行时间会达到37s。

delay.execute也接受async操作来指定是否异步执行,当异步的时候,也可以指定 timeout参数来指定超时时间。