本文为您介绍基于MaxCompute客户端通过Python 3 UDTF读取MaxCompute资源的使用示例。
前提条件
已安装MaxCompute客户端。更多安装MaxCompute客户端操作,请参见安装并配置MaxCompute客户端。
UDTF的动态参数说明
Python UDTF函数签名格式请参见函数签名及数据类型。
您可以在参数列表中使用
*
,表示接受任意长度、任意类型的输入参数。例如@annotate('double,*->string')
表示接受第一个参数是DOUBLE类型,后接任意长度、任意类型的参数列表。此时,您需要自己编写代码判断输入的个数和参数类型,然后对它们进行相应的操作(您可以对比C语言里面的printf
函数来理解此操作)。说明*
用在返回值列表中时,表示的是不同的含义。UDTF的返回值可以使用
*
,表示返回任意个STRING类型。返回值的个数与调用函数时设置的别名个数有关。例如@annotate("bigint,string->double,*")
,调用方式是UDTF(x, y) as (a, b, c)
,此处as
后面设置了三个别名,即a
、b
、c
。编辑器会认定a
为DOUBLE类型(Annotation中返回值第一列的类型是给定的),b
和c
为STRING类型。因为这里给出了三个返回值,所以UDTF在调用forward
时,forward
必须是长度为3的数组,否则会出现运行时报错。说明这种错误无法在编译时报出,因此UDTF的调用者在SQL中设置alias个数时,必须遵循该UDTF定义的规则。由于聚合函数的返回值个数固定是1,所以这个功能对UDAF来说并无意义。
UDTF代码示例
读取MaxCompute资源代码示例。
from odps.udf import annotate from odps.udf import BaseUDTF from odps.distcache import get_cache_file from odps.distcache import get_cache_table @annotate('string -> string, bigint') class UDTFExample(BaseUDTF): """读取资源文件和资源表里的pageid、adid_list,生成dict """ def __init__(self): import json cache_file = get_cache_file('test_json.txt') self.my_dict = json.load(cache_file) cache_file.close() records = list(get_cache_table('table_resource1')) for record in records: self.my_dict[record[0]] = record[1] """输入pageid,输出pageid以及它对应的所有adid """ def process(self, pageid): for adid in self.my_dict[pageid]: self.forward(pageid, adid)
动态参数代码示例。
from odps.udf import annotate from odps.udf import BaseUDTF import json @annotate('string,*->string,*') class JsonTuple(BaseUDTF): def process(self, *args): length = len(args) result = [None] * length try: obj = json.loads(args[0]) for i in range(1, length): result[i] = str(obj.get(args[i])) except Exception as err: result[0] = str(err) for i in range(1, length): result[i] = None self.forward(*result)
以上UDTF示例中,返回值个数会根据输入参数的个数来决定。输出参数中的第一个参数是一个JSON文本,后面的参数需要从JSON中根据Key进行解析。返回值中的第一个返回值是解析JSON过程中的出错信息,如果没有出错,则会根据输入的Key依次输出从JSON中解析出来的内容,使用示例如下。
-- 根据输入参数的个数定制输出别名个数。 SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b) FROM jsons; -- 变长部分可以一列都没有。 SELECT my_json_tuple(json) as exceptions FROM jsons; -- 下面这个SQL会出现运行时错误,因为别名个数与实际输出个数不符。 -- 注意编译时无法发现此错误。 SELECT my_json_tuple(json, 'a', 'b') as (exceptions, a, b, c) FROM jsons;
操作步骤
将UDTF代码示例保存为py_udtf_example.py文件,放置于MaxCompute客户端的bin目录中。
登录MaxCompute客户端创建资源表table_resource1和内部表tmp1(后续执行DML操作写入的目标表)并插入数据,准备资源文件test_json.txt并放置于MaxCompute客户端的bin目录中。
更多登录MaxCompute客户端操作,请参见安装并登录MaxCompute本地客户端。建表、插入数据命令及资源文件内容示例如下:
创建资源表table_resource1,并插入数据。
create table if not exists table_resource1 (pageid string, adid_list array<int>); insert into table table_resource1 values("contact_page2",array(2,3,4)),("contact_page3",array(5,6,7));
说明由于table_resource1中adid_list字段数据类型为ARRAY,读取表资源时需要在Session级别执行
set odps.sql.python.version=cp37;
命令开启Python 3来支持读取ARRAY类型数据。创建内部表tmp1,并插入数据。
create table if not exists tmp1 (pageid string); insert into table tmp1 values ("front_page"),("contact_page1"),("contact_page3");
资源文件test_json.txt的内容如下。
{"front_page":[1, 2, 3], "contact_page1":[3, 4, 5]}
通过MaxCompute客户端,将py_udtf_example.py文件、test_json.txt和表table_resource1添加为MaxCompute的资源。
更多添加资源信息,请参见添加资源。命令示例如下。
add py py_udtf_example.py; add file test_json.txt; add table table_resource1 as table_resource1;
在MaxCompute客户端上创建UDTF函数my_udtf。
更多创建函数信息,请参见注册函数。命令示例如下。
create function my_udtf as 'py_udtf_example.UDTFExample' using 'py_udtf_example.py, test_json.txt, table_resource1';
在MaxCompute客户端上执行SQL命令调用新创建的UDTF。
命令示例如下:
示例1:单纯使用UDTF函数运行SQL。
select my_udtf(pageid) as (pageid, adid) from tmp1;
返回结果如下。
+------------+------------+ | pageid | adid | +------------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | | contact_page3 | 6 | | contact_page3 | 7 | +------------+------------+
示例2:对示例1中的命令改写,结合Lateral View运行SQL。
select pageid, adid from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid;
返回结果如下。
+--------+------------+ | pageid | adid | +--------+------------+ | front_page | 1 | | front_page | 2 | | front_page | 3 | | contact_page1 | 3 | | contact_page1 | 4 | | contact_page1 | 5 | | contact_page3 | 5 | | contact_page3 | 6 | | contact_page3 | 7 | +--------+------------+
示例3:结合聚合函数和Lateral View运行SQL。
select adid, count(1) as cnt from tmp1 lateral view my_udtf(pageid) adTable as udtf_pageid, adid group by adid;
返回结果如下。
+------------+------------+ | adid | cnt | +------------+------------+ | 1 | 1 | | 2 | 1 | | 3 | 2 | | 4 | 1 | | 5 | 2 | | 6 | 1 | | 7 | 1 | +------------+------------+