本文为您介绍MapReduce API。

PyODPS DataFrame支持MapReduce API,您可以分别编写mapreduce函数(map_reduce可以只有mapper或者reducer过程)。

wordcount的示例如下。
>>> #encoding=utf-8
>>> from odps import ODPS
>>> from odps import options
>>> options.verbose = True
>>> o = ODPS('your-access-id', 'your-secret-access-key',project='DMP_UC_dev', endpoint='http://service-corp.odps.aliyun-inc.com/api')
>>> from odps.df import DataFrame

>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> def reducer(keys):
>>>     # 这里使用list而不是cnt=0,否则h内的cnt会被认为是局部变量,其中的赋值无法输出。
>>>     cnt = [0]
>>>     def h(row, done):  # done表示这个key已经迭代结束。
>>>         cnt[0] += row[1]
>>>         if done:
>>>             yield keys[0], cnt[0]
>>>     return h
>>> # zx_word_count表只有一列,为STRING类型。
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group=['word', ],
                        mapper_output_names=['word', 'cnt'],
                        mapper_output_types=['string', 'int'],
                        reducer_output_names=['word', 'cnt'],
                        reducer_output_types=['string', 'int'])
         word  cnt
0         are    1
1         day    1
2      doing?    1
3   everybody    1
4       first    1
5       hello    2
6         how    1
7          is    1
8          so    1
9         the    1
10       this    1
11      world    1
12        you    1

group参数用于指定reduce按哪些字段做分组,如果不指定,会按全部字段做分组。reducer需要接收聚合的keys进行初始化,并能继续处理按这些keys聚合的每行数据。done表示与这些keys相关的所有行是否都迭代完成。

为了方便,此处写成了函数闭包的方式,您也可以写成Callable的类。
class reducer(object):
    def __init__(self, keys):
        self.cnt = 0

    def __call__(self, row, done):  # done表示这个key已经迭代结束。
        self.cnt += row.cnt
        if done:
            yield row.word, self.cnt
使用output进行注释会让代码更简单。
>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>>     for word in row[0].split():
>>>         yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>>     # 此处使用list而不是cnt=0,否则h内的cnt会被认为是局部变量,其中的赋值无法输出。
>>>     cnt = [0]
>>>     def h(row, done):  # done表示这个key已经迭代结束。
>>>         cnt[0] += row.cnt
>>>         if done:
>>>             yield keys.word, cnt[0]
>>>     return h
>>>
>>> word_count = DataFrame(o.get_table('zx_word_count'))
>>> table = word_count.map_reduce(mapper, reducer, group='word')
         word  cnt
0         are    1
1         day    1
2      doing?    1
3   everybody    1
4       first    1
5       hello    2
6         how    1
7          is    1
8          so    1
9         the    1
10       this    1
11      world    1
12        you    1

在迭代的时候,可以使用sort参数实现按指定列排序,通过ascending参数指定升序降序。ascending参数可以是一个BOOL值,表示所有的sort字段是相同升序或降序; 也可以是一个列表,长度必须和sort字段长度相同。

指定COMBINER

combiner表示在map_reduce API里表示在mapper端,就先对数据进行聚合操作,它的用法和reducer是完全一致的,但不能引用资源。 并且,combiner的输出的字段名和字段类型必须和mapper完全一致。

上面的例子,您可以使用reducer作为combinermapper端对数据做初步的聚合,减少Shuffle出去的数据量。
>>> words_df.map_reduce(mapper, reducer, combiner=reducer, group='word')

引用资源

在MapReduce API里,您可以分别指定mapperreducer所要引用的资源。

以下示例为对mapper里的单词做停词过滤,在reducer里对白名单的单词数量加5。
>>> white_list_file = o.create_resource('pyodps_white_list_words', 'file', file_obj='Python\nWorld')
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(resources):
>>>     stop_words = set(r[0].strip() for r in resources[0])
>>>     def h(row):
>>>         for word in row[0].split():
>>>             if word not in stop_words:
>>>                 yield word, 1
>>>     return h
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(resources):
>>>     d = dict()
>>>     d['white_list'] = set(word.strip() for word in resources[0])
>>>     d['cnt'] = 0
>>>     def inner(keys):
>>>         d['cnt'] = 0
>>>         def h(row, done):
>>>             d['cnt'] += row.cnt
>>>             if done:
>>>                 if row.word in d['white_list']:
>>>                     d['cnt'] += 5
>>>                 yield keys.word, d['cnt']
>>>         return h
>>>     return inner
>>>
>>> words_df.map_reduce(mapper, reducer, group='word',
>>>                     mapper_resources=[stop_words], reducer_resources=[white_list_file])
     word  cnt
0   hello    2
1    life    1
2  python    7
3   world    6
4   short    1
5     use    1

使用第三方Python库

使用方法类似在MAP中使用第三方Python库。
  • 在全局指定使用的库。
    >>> from odps import options
    >>> options.df.libraries = ['six.whl', 'python_dateutil.whl']
  • 在立即执行的方法中,局部指定使用的库。
    >>> df.map_reduce(mapper=my_mapper, reducer=my_reducer, group='key').execute(libraries=['six.whl', 'python_dateutil.whl'])
    说明 由于字节码定义的差异,Python 3下使用新语言特性(例如yield from)时,代码在使用 Python 2.7的ODPS Worker上执行时会发生错误。因此,建议您在Python 3下使用MapReduce API编写生产作业前,先确认相关代码是否能正常执行。

重排数据

当数据在集群上分布不均匀时,您可以调用reshuffle接口对数据重排。
>>> df1 = df.reshuffle()
默认会按随机数做哈希来分布,但您也可以按指定列做分布,并且可以指定重排后的排序顺序。
>>> df1.reshuffle('name', sort='id', ascending=False)

布隆过滤器

PyODPS DataFrame提供了bloom_filter接口进行布隆过滤器的计算。

给定某个Collection和它的某个列计算的sequence1,对另外一个sequence2进行布隆过滤时,sequence1不存在于sequence2中的数据一定会被过滤掉, 但可能无法完全过滤。所以这种过滤方式是一种近似的过滤方法。这样的好处是对Collection进行快速过滤一些无用数据。这在一边数据量远大过另一边数据量(大部分数据并不会参与join运算)时的大规模join场景很有用。例如,在join用户的浏览数据和交易数据时,用户的浏览数据量远大于交易数据量,可以利用交易数据先对浏览数据进行布隆过滤, 然后再join可以很好地提升性能。
>>> df1 = DataFrame(pd.DataFrame({'a': ['name1', 'name2', 'name3', 'name1'], 'b': [1, 2, 3, 4]}))
>>> df1
       a  b
0  name1  1
1  name2  2
2  name3  3
3  name1  4
>>> df2 = DataFrame(pd.DataFrame({'a': ['name1']}))
>>> df2
       a
0  name1
>>> df1.bloom_filter('a', df2.a) # 这里第0个参数可以是个计算表达式如: df1.a + '1'。
       a  b
0  name1  1
1  name1  4
示例说明:
  • 由于数据量很小,df1中的aname2name3的行都被正确过滤掉了,当数据量很大的时候,可能无法过滤全部数据。
  • 之前提join场景中,少量数据不能被过滤的情况并不会影响数据的正确性,但可以较大地提升join的性能。
  • 您可以传入capacityerror_rate来设置数据的量以及错误率,默认值是30000.01
说明 要注意,调大capacity或者减小error_rate会增加内存的使用,所以应当根据实际情况选择一个合理的值。

Collection对象操作请参见DataFrame执行

透视表(PIVOT_TABLE)

PyODPS DataFrame提供了透视表的功能。示例的表数据如下。
>>> df
     A    B      C  D  E
0  foo  one  small  1  3
1  foo  one  large  2  4
2  foo  one  large  2  5
3  foo  two  small  3  6
4  foo  two  small  3  4
5  bar  one  large  4  5
6  bar  one  small  5  3
7  bar  two  small  6  2
8  bar  two  large  7  1
  • 使用透视表功能时,rows参数为必选参数,表示按一个或者多个字段做取平均值的操作。
    >>> df['A', 'D', 'E'].pivot_table(rows='A')
         A  D_mean  E_mean
    0  bar     5.5    2.75
    1  foo     2.2    4.40
  • rows可以提供多个字段,表示按多个字段做聚合。
    >>> df.pivot_table(rows=['A', 'B', 'C'])
         A    B      C  D_mean  E_mean
    0  bar  one  large     4.0     5.0
    1  bar  one  small     5.0     3.0
    2  bar  two  large     7.0     1.0
    3  bar  two  small     6.0     2.0
    4  foo  one  large     2.0     4.5
    5  foo  one  small     1.0     3.0
    6  foo  two  small     3.0     5.0
  • 指定values显示指定要计算的列。
    >>> df.pivot_table(rows=['A', 'B'], values='D')
         A    B    D_mean
    0  bar  one  4.500000
    1  bar  two  6.500000
    2  foo  one  1.666667
    3  foo  two  3.000000
  • 计算值列时,默认会计算平均值。通过aggfunc指定一个或者多个聚合函数。
    >>> df.pivot_table(rows=['A', 'B'], values=['D'], aggfunc=['mean', 'count', 'sum'])
         A    B    D_mean  D_count  D_sum
    0  bar  one  4.500000        2      9
    1  bar  two  6.500000        2     13
    2  foo  one  1.666667        3      5
    3  foo  two  3.000000        2      6
  • 将原始数据的某一列的值,作为新的Collection的列。
    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C')
         A    B  large_D_mean  small_D_mean
    0  bar  one           4.0           5.0
    1  bar  two           7.0           6.0
    2  foo  one           2.0           1.0
    3  foo  two           NaN           3.0
  • 使用fill_value填充空值。
    >>> df.pivot_table(rows=['A', 'B'], values='D', columns='C', fill_value=0)
         A    B  large_D_mean  small_D_mean
    0  bar  one             4             5
    1  bar  two             7             6
    2  foo  one             2             1
    3  foo  two             0             3

KEY-VALUE字符串转换

DataFrame提供了将Key-Value对展开为列,以及将普通列转换为Key-Value列的功能。
  • 将Key-Value对展开为列,示例数据如下。
    >>> df
        name               kv
    0  name1  k1=1,k2=3,k5=10
    1  name1    k1=7.1,k7=8.2
    2  name2    k2=1.2,k3=1.5
    3  name2      k9=1.1,k2=1
    通过extract_kv方法将Key-Value字段展开。
    >>> df.extract_kv(columns=['kv'], kv_delim='=', item_delim=',')
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    NaN   10.0    NaN    NaN
    1  name1    7.0    NaN    NaN    NaN    8.2    NaN
    2  name2    NaN    1.2    1.5    NaN    NaN    NaN
    3  name2    NaN    1.0    NaN    NaN    NaN    1.1
    其中,需要展开的字段名由columns指定,Key和Value之间的分隔符,以及Key-Value对之间的分隔符分别由kv_delimitem_delim这两个参数指定,默认分别为半角冒号和半角逗号。输出的字段名为原字段名和Key值的组合,通过_相连。缺失值默认为NONE,可通过fill_value选择需要填充的值。
    >>> df.extract_kv(columns=['kv'], kv_delim='=', fill_value=0)
       name   kv_k1  kv_k2  kv_k3  kv_k5  kv_k7  kv_k9
    0  name1    1.0    3.0    0.0   10.0    0.0    0.0
    1  name1    7.0    0.0    0.0    0.0    8.2    0.0
    2  name2    0.0    1.2    1.5    0.0    0.0    0.0
    3  name2    0.0    1.0    0.0    0.0    0.0    1.1
  • 将多列数据转换为一个Key-Value列,示例数据如下。
    >>> df
       name    k1   k2   k3    k5   k7   k9
    0  name1  1.0  3.0  NaN  10.0  NaN  NaN
    1  name1  7.0  NaN  NaN   NaN  8.2  NaN
    2  name2  NaN  1.2  1.5   NaN  NaN  NaN
    3  name2  NaN  1.0  NaN   NaN  NaN  1.1
    通过to_kv方法转换为Key-Value表示的格式。
    >>> df.to_kv(columns=['k1', 'k2', 'k3', 'k5', 'k7', 'k9'], kv_delim='=')
        name               kv
    0  name1  k1=1,k2=3,k5=10
    1  name1    k1=7.1,k7=8.2
    2  name2    k2=1.2,k3=1.5
    3  name2      k9=1.1,k2=1

DataFrame创建及其对象操作请参见创建DataFrame