Hologres V3.1版本起支持远程调用函数计算服务(Function Compute,简称FC)中的自定义函数,用于计算处理复杂的业务逻辑或进行高级数据操作。本文为您介绍如何在Hologres中使用自定义函数。
功能概述
Remote Function允许开发者通过将外部函数集成到Hologres中,扩展数据处理和分析的能力。在Hologres V3.1版本中,Remote Function支持调用阿里云的函数计算服务FC。该功能的引入,使用户能够在查询Hologres数据的同时,动态调用阿里云函数计算处理复杂的业务逻辑或进行高级数据操作。
通过Remote Function,用户可以实现以下三种应用场景:
实时数据处理:在查询数据时,实时调用外部函数进行数据清洗、格式转换或复杂计算。
集成第三方服务:通过函数计算,将Hologres数据与其他阿里云服务或第三方API进行交互。
高级分析:利用函数计算实现高级分析算法,如模型推断、机器学习等,并将结果直接写回Hologres。
前提条件
已开通Hologres实例,且该实例版本需为V3.1及以上。详情请参见购买Hologres。
已开通函数计算服务,且函数计算FC实例为V3.0版本。具体操作请登录函数计算控制台。
已授权服务关联角色
AliyunServiceRoleForHologresRemoteUDF
。登录Hologres管理控制台,单击左侧导航栏授权服务关联角色。
勾选
AliyunServiceRoleForHologresRemoteUDF
,单击一键授权完成。如图所示:
Hologre实例和函数计算服务需在同一地域下。
开通阿里云函数计算服务,并开发、部署供Hologres调用的函数,详情请参见代码开发。
注意事项
仅支持标量UDF和UDTF,暂不支持UDAF。
仅支持如下数据类型和对应的数组类型:
BOOLEAN
、INTEGER
、BIGINT
、REAL
、DOUBLE PRECISION
和TEXT
。Remote Function不支持常量入参。
该功能在函数计算FC中会产生一定费用。详情请参见产品计费。
该功能在Hologres中不会产生额外费用。
自定义函数管理
创建扩展
首次使用前需要创建扩展,命令如下:
CREATE EXTENSION [ IF NOT EXISTS ] function_compute;
创建函数
语法
CREATE [ OR REPLACE ] FUNCTION <function_name>
( [ [ argmode ] [ argname ] <argtype> [ { DEFAULT | = } default_expr ] [, ...] ] )
[ RETURNS [SETOF] rettype | RETURNS TABLE ( column_name column_type) ]
LANGUAGE function_compute
AS '<fc_endpoint>/<func_name>'
{
{ CALLED ON NULL INPUT | RETURNS NULL ON NULL INPUT | STRICT }
| SET function_compute.qualifier TO <qualifier>
| SET function_compute.compression TO <compression>
| SET function_compute.max_batch_size TO <max_batch_size>
} ...
参数说明
参数类别 | 参数 | 是否必填 | 说明 |
函数名称 | function_name | 是 | 函数名称。
|
函数参数 | argtype | 是 | 参数数据类型。 |
argmode | 否 | 支持IN,OUT和INOUT,默认为IN,为避免歧义不建议使用INOUT。另外,只能包含一个OUT或者1个INOUT的参数。 | |
argname | 否 | 参数名称。
| |
default_expr | 否 | 默认值,仅限入参,若某参数指定默认值,其后所有参数必须均有默认值。 | |
FC Endpoint | LANGUAGE function_compute | 是 | 固定值,声明使用函数计算服务。 |
fc_endpoint | 是 | FC的内网服务接入地址。详情请参见服务接入地址。 | |
func_name | 是 | 目标函数名称(需在FC控制台预先创建)。 | |
返回类型 | rettype | 否 | 返回值类型。 当存在OUT或INOUT参数时,可以省略RETURNS子句。如果存在该子句,则必须与输出参数隐含的结果类型一致。 |
column_name | 否 | 在RETURNS TABLE语法中,输出列的名称。 声明命名OUT参数的另一种方式,不同之处在于RETURNS TABLE还隐含RETURNS SETOF(即返回集合)。 | |
column_type | 否 | 在RETURNS TABLE语法中,输出列的数据类型。 | |
空值处理策略 | CALLED ON NULL INPUT | 否 | 默认行为,允许输入含NULL,函数需自行处理空值逻辑。 |
RETURNS NULL ON NULL INPUT | 否 | 任意输入为NULL时直接返回NULL。 | |
STRICT | 否 | 为RETURNS NULL ON NULL INPUT的别名。 | |
函数版本控制 | qualifier | 否 | 指定调用的函数版本或别名,默认值为 |
请求压缩算法 | compression | 否 | 指定调用函数的Request和Response中所用的压缩算法,有效值为:
|
单次请求最大批处理行数 | max_batch_size | 否 | 用于指定每批次发送至FC的最大行数。 核心目的是为存在内存限制或其他约束的FC函数设置批量处理的行数上限。若未显式指定此参数,Hologres将自动计算并采用最优批量大小,通常无需手动干预。 |
使用示例
创建标量UDF。
CREATE OR REPLACE FUNCTION rf_add ( a INTEGER, b INTEGER DEFAULT 1 ) RETURNS BIGINT LANGUAGE function_compute AS 'xxxxxxxxxxxxx.cn-shanghai-internal.fc.aliyuncs.com/add' ;
创建UDTF。
-- 形式 1 CREATE OR REPLACE FUNCTION rf_unnest(TEXT []) RETURNS TABLE (item TEXT ) LANGUAGE function_compute AS 'xxxxxxxxxxxxxx.cn-hangzhou-internal.fc.aliyuncs.com/unnest' STRICT SET function_compute.max_batch_size TO 1024; -- 形式 2 CREATE OR REPLACE FUNCTION rf_unnest(TEXT []) RETURNS SETOF TEXT LANGUAGE function_compute AS 'xxxxxxxxxxxxxx.cn-hangzhou-internal.fc.aliyuncs.com/unnest' STRICT SET function_compute.max_batch_size TO 1024;
查看函数
查看已创建的Remote Function。
SELECT
CASE
WHEN p.proretset = 'f' THEN 'scalar UDF'
WHEN p.proretset = 't' THEN 'UDTF'
END AS function_type,
n.nspname AS schema_name,
p.proname AS function_name,
pg_get_function_arguments(p.oid) AS arguments,
pg_get_function_result(p.oid) AS return_type,
p.proisstrict AS is_strict,
p.proconfig AS config,
pg_get_functiondef(p.oid) AS definition
FROM
pg_proc p
JOIN pg_language l ON p.prolang = l.oid
JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE
l.lanname = 'function_compute'
AND p.prokind != 'p'
ORDER BY
function_type,
schema_name,
function_name;
删除函数
语法
DROP FUNCTION [ IF EXISTS ] <function_name> [ ( [ [ argmode ] [ argname ] <argtype> [, ...] ] ) ] [, ...]
参数说明
参数 | 是否必填 | 说明 |
function_name | 是 | 要删除的函数名。 |
argtype | 是 | 参数数据类型。 |
argmode | 否 | 支持IN,OUT和INOUT,默认为IN,为避免歧义不建议使用INOUT。另外,只能包含一个OUT或者1个INOUT的参数。 |
argname | 否 | 参数名称。对于入参仅作为文档说明,对于出参,决定返回结果集的列名,若省略则由系统生成默认名称。 |
使用示例
DROP FUNCTION rf_add(INTEGER, INTEGER);
数据交互格式
Hologres发送到FC的请求格式
Hologres通过POST调用FC的InvokeFunction接口。请求体为JSON格式,其顶层为对象。目前仅包含名为data
键,其值为二维数组,其中的每个元素对应一批次中的一行数据,其中包含调用函数所需的参数。
数据的序列化规则
BOOLEAN
序列化为JSON BOOLEAN类型。INTEGER
、BIGINT
序列化为JSON NUMBER类型。REAL
、DOUBLE PRECISION
序列化为JSON NUMBER类型。TEXT
序列化为JSON STRING类型。NULL
序列化为JSON的NULL
。
示例
以下是一个针对具有签名rf_demo(TEXT, INTEGER, BOOLEAN)
的Remote Function的序列化请求示例。
{
"data": [
["foo", 100, true],
[null, null, false],
["bar", 200, false]
]
}
Hologres接收到FC的响应格式
当FC完成对某批次数据的处理时,需按照以下规范,以JSON的形式将结果数据返回给Hologres:
标量函数Scalar UDF。
顶层必须包含
results
字段,其值为一个数组,每个元素对应输入数据的一行结果。每行结果必须为一个数组,且严格遵循输入数据的顺序(即第N个元素对应输入数据的第N行)。
以下是一个大小为4的批次的返回数据示例:
{ "results": [ ["北京"], ["上海"], ["深圳"], ["广州"] ] }
UDTF:UTDF支持单行输入生成多行输出,需通过行号(row_num)标识原始输入行的关联关系。
顶层必须包含
results
字段,其值为一个数组,每个元素对应输出的一行数据。每行结果必须是包含两个元素的数组:
row_num(首元素):原始输入数据的行号(0起始索引),用于关联输入与输出。row_num不可乱序,必须以递增的形式返回。
result(次元素):处理后的一行返回值。
示例如下:
{ "results": [ [0, "北京"], [1, "上海"], [3, "深圳"], [3, "广州"], ] }
自定义函数使用示例
本示例以unnest
函数为例。
开通函数计算。
创建FC事件函数。
单击左侧导航栏函数,切换地域,保持与Hologres实例在同一地域。
在函数页面,单击创建函数,进入创建函数页面。
选择事件函数,配置如下参数,其他参数保持不变。详情请参见创建事件函数。
参数
说明
函数名称
自定义设置。例如
unnest
。运行环境
选择内置运行时/Python/Python 3.10。
代码上传方式
选择通过ZIP包上传代码。
代码包
上传已编写并打包好的代码包。
将如下代码保存至unnest.py,并压缩成unnest.zip。
import json def unnest(event, context): evt = json.loads(event) data = evt.get('data', None) if data is None: raise ValueError('no "data" key in event.') if not isinstance(data, list): raise ValueError('data is not a list.') res = list() for i in range(len(data)): if len(data[i]) != 1 or not isinstance(data[i], list): raise ValueError('the item in data is not a list.') for item in data[i][0]: res.append([i, item]) return json.dumps({'results': res})
请求处理程序
填写
unnest.unnest
。
创建Hologres Remote Function。
说明您需要在HoloWeb平台完成以下步骤,具体操作请参见连接HoloWeb。
CREATE EXTENSION IF NOT EXISTS function_compute; CREATE OR REPLACE FUNCTION rf_unnest(INTEGER []) RETURNS SETOF INTEGER STRICT LANGUAGE function_compute AS 'xxxxxxxxxxxxxxxxx.cn-shanghai-internal.fc.aliyuncs.com/unnest';
准备测试数据。
CREATE TABLE test_array ( numbers INTEGER[] ); INSERT INTO test_array (numbers) VALUES (ARRAY[1, 3]), (ARRAY[2, 4]), ('{}'), (ARRAY[]::INTEGER[]), (NULL);
调用Remote Function。
SELECT numbers, rf_unnest(numbers) FROM test_array; numbers | rf_unnest ---------+----------- {2,4} | 2 {2,4} | 4 {1,3} | 1 {1,3} | 3 (4 rows)