Python 2 UDTF读取MaxCompute资源示例

本文为您介绍基于MaxCompute客户端通过Python 2 UDTF读取MaxCompute资源的使用示例。

前提条件

已安装MaxCompute客户端。更多安装MaxCompute客户端操作,请参见安装并配置MaxCompute客户端

开发和使用步骤

1. 代码开发

UDTF代码示例如下:

# -*- coding: utf-8 -*-
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
    """读取资源文件和资源表里的pageid、adid,生成dict
    """
    def __init__(self):
        import json
        cache_file = get_cache_file('test_json.txt')
        self.my_dict = json.load(cache_file)
        cache_file.close()
        records = list(get_cache_table('table_resource1'))
        for record in records:
            self.my_dict[record[0]] = [record[1]]
    """输入pageid,输出pageid以及它对应的所有adid
    """
    def process(self, pageid):
        for adid in self.my_dict[pageid]:
            self.forward(pageid, adid)

将上述代码示例保存为py_udtf_example.py文件,放置于MaxCompute客户端的bin目录中。

2. 准备表数据

登录MaxCompute客户端创建资源表table_resource1和内部表tmp1(后续执行DML操作写入的目标表)并插入数据,准备资源文件test_json.txt并放置于MaxCompute客户端的bin目录中。

更多登录MaxCompute客户端操作,请参见安装并登录MaxCompute客户端。建表、插入数据命令及资源文件内容示例如下:

  • 创建资源表table_resource1,并插入数据。

    create table if not exists table_resource1 (pageid string, adid int);
    insert into table table_resource1 values("contact_page2",2),("contact_page3",5);
  • 创建内部表tmp1,并插入数据。

    create table if not exists tmp1 (pageid string);
    insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
  • 资源文件test_json.txt的内容如下。

    {"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}

3. 上传资源和注册函数

完成UDF代码开发和调试之后,将资源上传至MaxCompute并注册函数。

  1. 通过MaxCompute客户端,将py_udtf_example.py文件、test_json.txt和表table_resource1添加为MaxCompute的资源。

    更多添加资源信息,请参见添加资源。命令示例如下。

    add py py_udtf_example.py;
    add file test_json.txt;
    add table table_resource1 as table_resource1;
  2. 在MaxCompute客户端上创建UDTF函数my_udtf。

    更多创建函数信息,请参见注册函数。命令示例如下。

    create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';

4. 使用示例

成功注册UDTF后,在MaxCompute客户端上执行SQL命令调用新创建的UDTF。

命令示例如下:

  • 示例1:单纯使用UDTF函数运行SQL。

    select my_udtf(pageid) as (pageid, adid) from tmp1;

    返回结果如下。

    +------------+------------+
    | pageid     | adid       |
    +------------+------------+
    | front_page | 1          |
    | front_page | 2          |
    | front_page | 3          |
    | contact_page1 | 3          |
    | contact_page1 | 4          |
    | contact_page1 | 5          |
    | contact_page3 | 5          |
    +------------+------------+
  • 示例2:对示例1中的命令改写,结合Lateral View运行SQL。

    select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;

    返回结果如下。

    +------------+------------+
    | pageid     | adid       |
    +------------+------------+
    | front_page | 1          |
    | front_page | 2          |
    | front_page | 3          |
    | contact_page1 | 3          |
    | contact_page1 | 4          |
    | contact_page1 | 5          |
    | contact_page3 | 5          |
    +------------+------------+

    执行结果的两列分别为tmp1.pageid和UDTF输出的adid。

  • 示例3:结合聚合函数和Lateral View运行SQL。

    --聚合统计每个adid出现的次数。
    select adid, count(1) as cnt
        from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid
    group by adid;

    返回结果如下。

    +------------+------------+
    | adid       | cnt        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 1          |
    | 3          | 2          |
    | 4          | 1          |
    | 5          | 2          |
    +------------+------------+