本文为您介绍基于MaxCompute客户端通过Python 2 UDTF读取MaxCompute资源的使用示例。
前提条件
已安装MaxCompute客户端。更多安装MaxCompute客户端操作,请参见安装并配置MaxCompute客户端。
开发和使用步骤
1. 代码开发
UDTF代码示例如下:
# -*- coding: utf-8 -*-
from odps.udf import annotate
from odps.udf import BaseUDTF
from odps.distcache import get_cache_file
from odps.distcache import get_cache_table
@annotate('string -> string, bigint')
class UDTFExample(BaseUDTF):
"""读取资源文件和资源表里的pageid、adid,生成dict
"""
def __init__(self):
import json
cache_file = get_cache_file('test_json.txt')
self.my_dict = json.load(cache_file)
cache_file.close()
records = list(get_cache_table('table_resource1'))
for record in records:
self.my_dict[record[0]] = [record[1]]
"""输入pageid,输出pageid以及它对应的所有adid
"""
def process(self, pageid):
for adid in self.my_dict[pageid]:
self.forward(pageid, adid)
将上述代码示例保存为py_udtf_example.py文件,放置于MaxCompute客户端的bin目录中。
2. 准备表数据
登录MaxCompute客户端创建资源表table_resource1和内部表tmp1(后续执行DML操作写入的目标表)并插入数据,准备资源文件test_json.txt并放置于MaxCompute客户端的bin目录中。
更多登录MaxCompute客户端操作,请参见安装并登录MaxCompute客户端。建表、插入数据命令及资源文件内容示例如下:
创建资源表table_resource1,并插入数据。
create table if not exists table_resource1 (pageid string, adid int); insert into table table_resource1 values("contact_page2",2),("contact_page3",5);
创建内部表tmp1,并插入数据。
create table if not exists tmp1 (pageid string); insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
资源文件test_json.txt的内容如下。
{"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
3. 上传资源和注册函数
完成UDF代码开发和调试之后,将资源上传至MaxCompute并注册函数。
通过MaxCompute客户端,将py_udtf_example.py文件、test_json.txt和表table_resource1添加为MaxCompute的资源。
更多添加资源信息,请参见添加资源。命令示例如下。
add py py_udtf_example.py; add file test_json.txt; add table table_resource1 as table_resource1;
在MaxCompute客户端上创建UDTF函数my_udtf。
更多创建函数信息,请参见注册函数。命令示例如下。
create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';
4. 使用示例
成功注册UDTF后,在MaxCompute客户端上执行SQL命令调用新创建的UDTF。
命令示例如下:
示例1:单纯使用UDTF函数运行SQL。
select my_udtf(pageid) as (pageid, adid) from tmp1;
返回结果如下。
+------------+------------+ | pageid | adid | +------------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | +------------+------------+
示例2:对示例1中的命令改写,结合Lateral View运行SQL。
select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;
返回结果如下。
+------------+------------+ | pageid | adid | +------------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | +------------+------------+
执行结果的两列分别为tmp1.pageid和UDTF输出的adid。
示例3:结合聚合函数和Lateral View运行SQL。
--聚合统计每个adid出现的次数。 select adid, count(1) as cnt from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid group by adid;
返回结果如下。
+------------+------------+ | adid | cnt | +------------+------------+ | 1 | 1 | | 2 | 1 | | 3 | 2 | | 4 | 1 | | 5 | 2 | +------------+------------+