本文向您介绍DataFrame API提供Sequence操作。

列运算

from odps.df import DataFrame
iris = DataFrame(o.get_table('pyodps_iris'))
lens = DataFrame(o.get_table('pyodps_ml_100k_lens'))

为一个Sequence加上一个常量或执行sin函数时,这些操作将作用于Sequence中的每个元素。

NULL相关(isnull,notnull,fillna)

DataFrame API提供了几个和NULL相关的内置函数,例如isnull用于判断某字段是否为NULL,notnull用于判断某字段是否为非NULL,fillna用于将NULL填充为您指定的值。

>>> iris.sepallength.isnull().head(5)
   sepallength
0        False
1        False
2        False
3        False
4        False

逻辑判断(ifelse,switch)

ifelse作用于BOOLEAN类型的字段,当条件成立时,返回第0个参数,否则返回第1个参数。

>>> (iris.sepallength > 5).ifelse('gt5', 'lte5').rename('cmp5').head(5)
   cmp5
0   gt5
1  lte5
2  lte5
3  lte5
4  lte5

switch用于多条件判断的情况。

>>> iris.sepallength.switch(4.9, 'eq4.9', 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
   equalness
0       noeq
1      eq4.9
2       noeq
3       noeq
4      eq5.0
>>> from odps.df import switch
>>> switch(iris.sepallength == 4.9, 'eq4.9', iris.sepallength == 5.0, 'eq5.0', default='noeq').rename('equalness').head(5)
   equalness
0       noeq
1      eq4.9
2       noeq
3       noeq
4      eq5.0

PyODPS 0.7.8以上版本支持根据条件修改数据集某一列的一部分值,写法如下。

>>> iris[iris.sepallength > 5, 'cmp5'] = 'gt5'
>>> iris[iris.sepallength <= 5, 'cmp5'] = 'lte5'
>>> iris.head(5)
   cmp5
0   gt5
1  lte5
2  lte5
3  lte5
4  lte5

数学运算

对于数字类型的字段,支持加法(+)、减法(-)、乘法(*)和除法(/)等操作,也支持log、sin等数学计算。

>>> (iris.sepallength * 10).log().head(5)
   sepallength
0     3.931826
1     3.891820
2     3.850148
3     3.828641
4     3.912023
>>> fields = [iris.sepallength,
>>>           (iris.sepallength / 2).rename('sepallength除以2'),
>>>           (iris.sepallength ** 2).rename('sepallength的平方')]
>>> iris[fields].head(5)
   sepallength  sepallength除以2  sepallength的平方
0          5.1              2.55             26.01
1          4.9              2.45             24.01
2          4.7              2.35             22.09
3          4.6              2.30             21.16
4          5.0              2.50             25.00
算术运算支持如下操作。
算术操作 说明
abs 绝对值。
sqrt 平方根。
sin 无。
sinh 无。
cos 无。
cosh 无。
tan 无。
tanh 无。
arccos 无。
arccosh 无。
arcsin 无。
arcsinh 无。
arctan 无。
arctanh 无。
exp 指数函数。
expm1 指数减1。
log 传入参数表示底是几。
log2 无。
log10 无。
log1p log(1+x)。
radians 给定角度计算弧度。
degrees 给定弧度计算角度。
ceil 不小于输入值的最小整数。
floor 向下取整,返回比输入值小的整数值。
trunc 将输入值截取到指定小数点位置。
Sequence也支持自身与其它Sequence或Scalar的比较。
>>> (iris.sepallength < 5).head(5)
   sepallength
0        False
1         True
2         True
3         True
4        False
虽然DataFrame API不支持连续操作,例如3 <= iris.sepallength <= 5,但是between函数可以用于判断iris.sepallength是否在某个区间。
>>> (iris.sepallength.between(3, 5)).head(5)
   sepallength
0        False
1         True
2         True
3         True
4         True
默认情况下,between包含两边的区间,如果计算开区间,则需要设置inclusive=False
>>> (iris.sepallength.between(3, 5, inclusive=False)).head(5)
   sepallength
0        False
1         True
2         True
3         True
4        False

STRING相关操作

DataFrame API提供了一系列针对STRING类型的Sequence或者Scalar的操作。
>>> fields = [
>>>     iris.name.upper().rename('upper_name'),
>>>     iris.name.extract('Iris(.*)', group=1)
>>> ]
>>> iris[fields].head(5)
    upper_name     name
0  IRIS-SETOSA  -setosa
1  IRIS-SETOSA  -setosa
2  IRIS-SETOSA  -setosa
3  IRIS-SETOSA  -setosa
4  IRIS-SETOSA  -setosa
STRING的相关操作如下。
STRING操作 说明
capitalize 无。
contains 包含某个字符串,如果regex参数为True,则是包含某个正则表达式,默认为True。
count 指定字符串出现的次数。
endswith 以某个字符串结尾。
startswith 以某个字符串开头。
extract 抽取出某个正则表达式,如果group不指定,则返回满足整个pattern的子串;否则,返回第几个group。
find 返回第一次出现的子串位置,若不存在则返回-1。
rfind 从右查找返回子串第一次出现的位置,不存在则返回-1。
replace 将某个pattern的子串全部替换成另一个子串, n参数若指定,则替换n次。
get 返回某个位置上的字符串。
len 返回字符串的长度。
ljust 若未达到指定的width的长度,则在右侧填充fillchar指定的字符串(默认空格)。
rjust 若未达到指定的width的长度,则在左侧填充fillchar指定的字符串(默认空格)。
lower 变为全部小写。
upper 变为全部大写。
lstrip 在左侧删除空格(包括空行符)。
rstrip 在右侧删除空格(包括空行符)。
strip 在左右两侧删除空格(包括空行符)。
split 将字符串按分隔符拆分为若干个字符串(返回 list<string>类型)。
pad 在指定的位置(left,right或者both)用指定填充字符(用fillchar指定,默认空格)来对齐。
repeat 重复指定n次。
slice 切片操作。
swapcase 对调大小写。
title str.title
zfill 长度没达到指定width,则左侧填充0。
isalnum str.isalnum
isalpha str.isalpha
isdigit 是否都是数字,同str.isdigit
isspace 是否都是空格,同str.isspace
islower 是否都是小写,同str.islower
isupper 是否都是大写,同str.isupper
istitle str.istitle
isnumeric str.isnumeric
isdecimal str.isdecimal
todict 将字符串按分隔符拆分为一个Dict,传入的两个参数分别为项目分隔符和Key-Value分隔符(返回dict<string, string>类型)
strptime

按格式化读取成时间,时间格式和Python标准库相同,详细参考 Python 时间格式化

时间相关操作

对于DATETIME类型Sequence或者Scalar,可以调用时间相关的内置函数。

>>> df = lens[[lens.unix_timestamp.astype('datetime').rename('dt')]]
>>> df[df.dt,
>>>    df.dt.year.rename('year'),
>>>    df.dt.month.rename('month'),
>>>    df.dt.day.rename('day'),
>>>    df.dt.hour.rename('hour')].head(5)
                    dt  year  month  day  hour
0  1998-04-08 11:02:00  1998      4    8    11
1  1998-04-08 10:57:55  1998      4    8    10
2  1998-04-08 10:45:26  1998      4    8    10
3  1998-04-08 10:25:52  1998      4    8    10
4  1998-04-08 10:44:19  1998      4    8    10
与时间相关的属性如下。
时间相关属性 说明
year 无。
month 无。
day 无。
hour 无。
minute 无。
second 无。
weekofyear 返回日期位于那一年的第几周。周一作为一周的第一天。
weekday 返回日期当前周的第几天。
dayofweek 同weekday。
strftime

格式化时间,时间格式和Python标准库相同,详请请参见 Python 时间格式化

PyODPS也支持时间的加减操作,例如可以通过以下方法得到前3天的日期。两个日期列相减得到相差的毫秒数。

>>> df
                           a                          b
0 2016-12-06 16:43:12.460001 2016-12-06 17:43:12.460018
1 2016-12-06 16:43:12.460012 2016-12-06 17:43:12.460021
2 2016-12-06 16:43:12.460015 2016-12-06 17:43:12.460022
>>> from odps.df import day
>>> df.a - day(3)
                           a
0 2016-12-03 16:43:12.460001
1 2016-12-03 16:43:12.460012
2 2016-12-03 16:43:12.460015
>>> (df.b - df.a).dtype
int64
>>> (df.b - df.a).rename('a')
         a
0  3600000
1  3600000
2  3600000
支持的时间类型如下表所示。
属性 说明
year 无。
month 无。
day 无。
hour 无。
minute 无。
second 无。
millisecond 无。

集合类型相关操作

PyODPS支持的集合类型有List和Dict。这两个类型都可以使用下标获取集合中的某个项目。len方法用于获得集合的大小。

同时,两种集合均有explode方法,用于展开集合中的内容。对于List,explode默认返回一列,当传入参数pos时, 将返回两列,其中一列为值在数组中的编号(类似Python的enumerate函数)。对于Dict,explode会返回两列, 分别表示keys及values。explode中也可以传入列名,作为最后生成的列。

示例如下。

>>> df
   id         a                            b
0   1  [a1, b1]  {'a2': 0, 'b2': 1, 'c2': 2}
1   2      [c1]           {'d2': 3, 'e2': 4}
>>> df[df.id, df.a[0], df.b['b2']]
   id   a    b
0   1  a1    1
1   2  c1  NaN
>>> df[df.id, df.a.len(), df.b.len()]
   id  a  b
0   1  2  3
1   2  1  2
>>> df.a.explode()
    a
0  a1
1  b1
2  c1
>>> df.a.explode(pos=True)
   a_pos   a
0      0  a1
1      1  b1
2      0  c1
>>> # 指定列名。
>>> df.a.explode(['pos', 'value'], pos=True)
   pos value
0    0    a1
1    1    b1
2    0    c1
>>> df.b.explode()
  b_key  b_value
0    a2        0
1    b2        1
2    c2        2
3    d2        3
4    e2        4
>>> # 指定列名。
>>> df.b.explode(['key', 'value'])
  key  value
0  a2      0
1  b2      1
2  c2      2
3  d2      3
4  e2      4

explode也可以和并列多行输出结合,以将原有列和explode的结果相结合,示例如下。

>>> df[df.id, df.a.explode()]
   id   a
0   1  a1
1   1  b1
2   2  c1
>>> df[df.id, df.a.explode(), df.b.explode()]
   id   a b_key  b_value
0   1  a1    a2        0
1   1  a1    b2        1
2   1  a1    c2        2
3   1  b1    a2        0
4   1  b1    b2        1
5   1  b1    c2        2
6   2  c1    d2        3
7   2  c1    e2        4
除了下标、lenexplode两个共有方法以外,List还支持下列方法。
List 操作 说明
contains(v) 列表是否包含某个元素。
sort 返回排序后的列表(返回值为List)。
Dict还支持下列方法。
dict操作 说明
keys 获取Dict keys(返回值为List)。
values 获取Dict values(返回值为List)。

其它元素操作(isin,notin,cut)

isin用于判断Sequence里的元素是否在某个集合元素里;notin反之。
>>> iris.sepallength.isin([4.9, 5.1]).rename('sepallength').head(5)
   sepallength
0         True
1         True
2        False
3        False
4        False

cut提供离散化的操作,可以将Sequence的数据拆成几个区段。

>>> iris.sepallength.cut(range(6), labels=['0-1', '1-2', '2-3', '3-4', '4-5']).rename('sepallength_cut').head(5)
   sepallength_cut
0             None
1              4-5
2              4-5
3              4-5
4              4-5

include_underinclude_over可以分别包括向下和向上的区间。

>>> labels = ['0-1', '1-2', '2-3', '3-4', '4-5', '5-']
>>> iris.sepallength.cut(range(6), labels=labels, include_over=True).rename('sepallength_cut').head(5)
   sepallength_cut
0               5-
1              4-5
2              4-5
3              4-5
4              4-5

使用自定义函数

DataFrame函数支持对Sequence使用map,它会对它的每个元素调用自定义函数。
>>> iris.sepallength.map(lambda x: x + 1).head(5)
   sepallength
0          6.1
1          5.9
2          5.7
3          5.6
4          6.0
说明 目前,受限于Python UDF,自定义函数无法支持将List/Dict类型作为输入或输出。
如果map前后,Sequence的类型发生了变化,则需要显式指定map后的类型。
>>> iris.sepallength.map(lambda x: 't'+str(x), 'string').head(5)
   sepallength
0         t5.1
1         t4.9
2         t4.7
3         t4.6
4         t5.0
如果在函数中包含闭包,则函数外闭包变量值的变化会引起函数内该变量值的变化。
>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(lambda x: x + i))
结果为dfs中每个SequenceExpr均为df.sepal_length+9。为解决此问题,可以将函数作为另一函数的返回值,或者使用partial。两个示例如下。
>>> dfs = []
>>> def get_mapper(i):
>>>     return lambda x: x + i
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(get_mapper(i)))
>>> import functools
>>> dfs = []
>>> for i in range(10):
>>>     dfs.append(df.sepal_length.map(functools.partial(lambda v, x: x + v, i)))

map也支持使用现有的UDF函数,传入的参数是str类型(函数名)或者Function对象,详情请参见函数

map传入Python函数的实现使用了MaxCompute Python UDF。因此,如果您所在的Project不支持Python UDF,则map函数无法使用。除此以外,所有Python UDF的限制在此都适用。

目前,第三方库(包含C)只能使用numpy,第三方库使用详情请参见使用第三方Python库

除了调用自定义函数,DataFrame还提供了很多内置函数,这些函数中部分使用了map函数来实现。因此,如果您所在Project未开通Python UDF,则无法使用这些函数(注意:阿里云公共服务暂不提供对Python UDF的支持)。

说明 由于字节码定义的差异,Python 3下使用新语言特性(例如yield from)时,代码在使用Python 2.7的MaxCompute Worker上执行时会发生错误。因此,建议您在Python 3下使用MapReduce API编写生产作业前,先确认相关代码是否能正常执行。
  • 引用资源
    自定义函数也能读取MaxCompute上的资源(表资源或文件资源),或者引用一个collection作为资源。此时,自定义函数需要写成函数闭包或callable的类。两个示例如下。
    >>> file_resource = o.create_resource('pyodps_iris_file', 'file', file_obj='Iris-setosa')
    >>>
    >>> iris_names_collection = iris.distinct('name')[:2]
    >>> iris_names_collection
           sepallength
    0      Iris-setosa
    1  Iris-versicolor
    >>> def myfunc(resources):  # resources按调用顺序传入
    >>>     names = set()
    >>>     fileobj = resources[0] # 文件资源是一个file-like的object
    >>>     for l in fileobj:
    >>>         names.add(l)
    >>>     collection = resources[1]
    >>>     for r in collection:
    >>>         names.add(r.name)  # 这里可以通过字段名或者偏移来取
    >>>     def h(x):
    >>>         if x in names:
    >>>             return True
    >>>         else:
    >>>             return False
    >>>     return h
    >>>
    >>> df = iris.distinct('name')
    >>> df = df[df.name,
    >>>         df.name.map(myfunc, resources=[file_resource, iris_names_collection], rtype='boolean').rename('isin')]
    >>>
    >>> df
                  name   isin
    0      Iris-setosa   True
    1  Iris-versicolor   True
    2   Iris-virginica  False
    说明 分区表资源在读取时不包含分区字段。
  • 使用第三方Python库

    您可以把第三方Python包作为资源上传到MaxCompute,支持的格式有whleggzip以及tar.gz。在全局或者在立即执行的方法时,指定需要使用的包文件,即可以在自定义函数中使用第三方库。第三方库的依赖库,也必须指定,否则依然会导致导入错误。

    下面以python-dateutil包进行举例:
    1. 使用pip download命令,下载包以及其依赖到某个路径。下载后会出现两个包:six-1.10.0-py2.py3-none-any.whlpython_dateutil-2.5.3-py2.py3-none-any.whl注意:这里需要下载支持Linux环境的包)。
      $ pip download python-dateutil -d /to/path/
    2. 分别把两个文件上传到MaxCompute资源。
      >>> # 这里要确保资源名的后缀是正确的文件类型。
      >>> odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb'))
      >>> odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
    3. 现在有个DataFrame,只有一个STRING类型字段。
      >>> df
                     datestr
      0  2016-08-26 14:03:29
      1  2015-08-26 14:03:29
    4. 全局配置使用到的三方库如下。
      >>> from odps import options
      >>>
      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
      >>> df.datestr.map(get_year)
         datestr
      0     2016
      1     2015
      或者通过运行方法的libraries参数指定使用到的第三方库。
      >>> def get_year(t):
      >>>     from dateutil.parser import parse
      >>>     return parse(t).strftime('%Y')
      >>>
      >>> df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
         datestr
      0     2016
      1     2015
    PyODPS默认支持执行仅包含Python且不含文件操作的第三方库。在较新版本的MaxCompute服务下,PyODPS也支持执行带有二进制代码或带有文件操作的Python库。这些库的后缀必须是cp27-cp27m-manylinux1_x86_64,以archive方式上传,whl后缀的包需要重命名为zip。同时,作业需要开启odps.isolation.session.enable选项,或者在Project级别开启isolation。以下示例为您展示如何上传并使用scipy中的特殊函数。
    >>> # 对于含有二进制代码的包,必须使用archive方式上传资源,whl后缀需要改为zip。
    >>> odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
    >>>
    >>> # 如果Project开启了isolation,下面的选项不是必需的。
    >>> options.sql.settings = { 'odps.isolation.session.enable': True }
    >>>
    >>> def psi(value):
    >>>     # 建议在函数内部import第三方库,以防止不同操作系统下二进制包结构差异造成执行错误。
    >>>     from scipy.special import psi
    >>>     return float(psi(value))
    >>>
    >>> df.float_col.map(psi).execute(libraries=['scipy.zip'])
    对于只提供源码的二进制包,可以在Linux Shell中打包成Wheel再上传,Mac和Windows中生成的Wheel包无法在MaxCompute中使用。
    python setup.py bdist_wheel
  • 使用计数器
    from odps.udf import get_execution_context
    def h(x):
        ctx = get_execution_context()
        counters = ctx.get_counters()
        counters.get_counter('df', 'add_one').increment(1)
        return x + 1
    df.field.map(h)
    Logview的JSONSummary中即可找到计数器值。

调用MaxCompute内建或者已定义函数

如果您需要调用MaxCompute上的内建或者已定义函数来生成列,您可以使用func接口,该接口默认函数返回值为STRING,可以用rtype参数指定返回值。
>>> from odps.df import func
>>>
>>> iris[iris.name, func.rand(rtype='float').rename('rand')][:4]
>>> iris[iris.name, func.rand(10, rtype='float').rename('rand')][:4]
>>> # 调用ODPS上定义的UDF,列名无法确定时需要手动指定。
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float').rename('new_col')]
>>> # 从其它Project调用UDF,也可通过name参数指定列名。
>>> iris[iris.name, func.your_udf(iris.sepalwidth, iris.sepallength, rtype='float', project='udf_project', name='new_col')]
说明 Pandas后端不支持执行带有func的表达式。