本文为您介绍基于MaxCompute客户端通过Python 2 UDTF读取MaxCompute资源的使用示例。

前提条件

已安装MaxCompute客户端。更多安装MaxCompute客户端操作,请参见安装并配置MaxCompute客户端

UDTF代码示例

# -*- coding: utf-8 -*-
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
    """读取资源文件和资源表里的pageid、adid,生成dict
    """
    def __init__(self):
        import json
        cache_file = get_cache_file('test_json.txt')
        self.my_dict = json.load(cache_file)
        cache_file.close()
        records = list(get_cache_table('table_resource1'))
        for record in records:
            self.my_dict[record[0]] = [record[1]]
    """输入pageid,输出pageid以及它对应的所有adid
    """
    def process(self, pageid):
        for adid in self.my_dict[pageid]:
            self.forward(pageid, adid)

操作步骤

  1. UDTF代码示例保存为py_udtf_example.py文件,放置于MaxCompute客户端的bin目录中。
  2. 登录MaxCompute客户端创建资源表table_resource1和内部表tmp1(后续执行DML操作写入的目标表)并插入数据,准备资源文件test_json.txt并放置于MaxCompute客户端的bin目录中。
    更多登录MaxCompute客户端操作,请参见登录MaxCompute客户端。建表、插入数据命令及资源文件内容示例如下:
    • 创建资源表table_resource1,并插入数据。
      create table if not exists table_resource1 (pageid string, adid int);
      insert into table table_resource1 values("contact_page2",2),("contact_page3",5);
    • 创建内部表tmp1,并插入数据。
      create table if not exists tmp1 (pageid string);
      insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
    • 资源文件test_json.txt的内容如下。
      {"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
  3. 通过MaxCompute客户端,将py_udtf_example.py文件、test_json.txt和表table_resource1添加为MaxCompute的资源。
    更多添加资源信息,请参见添加资源。命令示例如下。
    add py py_udtf_example.py;
    add file test_json.txt;
    add table table_resource1 as table_resource1;
  4. 在MaxCompute客户端上创建UDTF函数my_udtf。
    更多创建函数信息,请参见注册函数。命令示例如下。
    create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';
  5. 在MaxCompute客户端上执行SQL命令调用新创建的UDTF。
    命令示例如下:
    • 示例1:单纯使用UDTF函数运行SQL。
      select my_udtf(pageid) as (pageid, adid) from tmp1;
      返回结果如下。
      +------------+------------+
      | pageid     | adid       |
      +------------+------------+
      | front_page | 1          |
      | front_page | 2          |
      | front_page | 3          |
      | contact_page1 | 3          |
      | contact_page1 | 4          |
      | contact_page1 | 5          |
      | contact_page3 | 5          |
      +------------+------------+
    • 示例2:对示例1中的命令改写,结合Lateral View运行SQL。
      select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;
      返回结果如下。
      +------------+------------+
      | pageid     | adid       |
      +------------+------------+
      | front_page | 1          |
      | front_page | 2          |
      | front_page | 3          |
      | contact_page1 | 3          |
      | contact_page1 | 4          |
      | contact_page1 | 5          |
      | contact_page3 | 5          |
      +------------+------------+

      执行结果的两列分别为tmp1.pageid和UDTF输出的adid。

    • 示例3:结合聚合函数和Lateral View运行SQL。
      --聚合统计每个adid出现的次数。
      select adid, count(1) as cnt
          from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid
      group by adid;
      返回结果如下。
      +------------+------------+
      | adid       | cnt        |
      +------------+------------+
      | 1          | 1          |
      | 2          | 1          |
      | 3          | 2          |
      | 4          | 1          |
      | 5          | 2          |
      +------------+------------+