自定义函数UDF(Remote UDF)

Hologres V3.1版本起支持远程调用函数计算服务(Function Compute,简称FC)中的自定义函数,用于计算处理复杂的业务逻辑或进行高级数据操作。本文为您介绍如何在Hologres中使用自定义函数。

功能概述

Remote Function允许开发者通过将外部函数集成到Hologres中,扩展数据处理和分析的能力。在Hologres V3.1版本中,Remote Function支持调用阿里云的函数计算服务FC。该功能的引入,使用户能够在查询Hologres数据的同时,动态调用阿里云函数计算处理复杂的业务逻辑或进行高级数据操作。

通过Remote Function,用户可以实现以下三种应用场景:

  • 实时数据处理:在查询数据时,实时调用外部函数进行数据清洗、格式转换或复杂计算。

  • 集成第三方服务:通过函数计算,将Hologres数据与其他阿里云服务或第三方API进行交互。

  • 高级分析:利用函数计算实现高级分析算法,如模型推断、机器学习等,并将结果直接写回Hologres。

前提条件

  • 已开通Hologres实例,且该实例版本需为V3.1及以上。详情请参见购买Hologres

  • 已开通函数计算服务,且函数计算FC实例为V3.0版本。具体操作请登录函数计算控制台

  • 已授权服务关联角色AliyunServiceRoleForHologresRemoteUDF

    1. 登录Hologres管理控制台,单击左侧导航栏授权服务关联角色

    2. 勾选AliyunServiceRoleForHologresRemoteUDF,单击一键授权完成。如图所示:

      yuyuyuuyueeeee

说明
  • Hologre实例和函数计算服务需在同一地域下。

  • 开通阿里云函数计算服务,并开发、部署供Hologres调用的函数,详情请参见代码开发

注意事项

  • 仅支持标量UDFUDTF,暂不支持UDAF。

  • 仅支持如下数据类型和对应的数组类型:BOOLEANINTEGERBIGINTREALDOUBLE PRECISIONTEXT

  • Remote Function不支持常量入参。

  • 该功能在函数计算FC中会产生一定费用。详情请参见产品计费

  • 该功能在Hologres中不会产生额外费用。

自定义函数管理

创建扩展

首次使用前需要创建扩展,命令如下:

CREATE EXTENSION [ IF NOT EXISTS ] function_compute;

创建函数

语法

CREATE [ OR REPLACE ] FUNCTION <function_name>
  ( [ [ argmode ] [ argname ] <argtype> [ { DEFAULT | = } default_expr ] [, ...] ] )
    [ RETURNS [SETOF] rettype | RETURNS TABLE ( column_name column_type) ]
  LANGUAGE function_compute
  AS '<fc_endpoint>/<func_name>'
  {
    { CALLED ON NULL INPUT | RETURNS NULL ON NULL INPUT | STRICT }
    | SET function_compute.qualifier TO <qualifier>
    | SET function_compute.compression TO <compression>
    | SET function_compute.max_batch_size TO <max_batch_size>
  } ...

参数说明

参数类别

参数

是否必填

说明

函数名称

function_name

函数名称。

  • 在同一Schema下函数名称必须唯一,且不可与参数类型相同的现有函数冲突。但通过参数类型的不同进行重载以区分命名是被支持的,例如rf_sum(int)rf_sum(float)

  • 为避免冲突,建议Remote Function增加统一的rf_前缀。

  • 若指定Schema,例如schema1.func,函数将创建于该Schema。

  • 使用CREATE OR REPLACE FUNCTION替换函数时,不可修改函数名称、参数类型或返回类型(需删除重建)。

函数参数

argtype

参数数据类型。

argmode

支持IN,OUTINOUT,默认为IN,为避免歧义不建议使用INOUT。另外,只能包含一个OUT或者1INOUT的参数。

argname

参数名称。

  • 对于入参,仅作为文档说明。

  • 对于出参,决定返回结果集的列名,若省略则由系统生成默认名称。

default_expr

默认值,仅限入参,若某参数指定默认值,其后所有参数必须均有默认值。

FC Endpoint

LANGUAGE function_compute

固定值,声明使用函数计算服务。

fc_endpoint

FC的内网服务接入地址。详情请参见服务接入地址

func_name

目标函数名称(需在FC控制台预先创建)。

返回类型

rettype

返回值类型。

当存在OUTINOUT参数时,可以省略RETURNS子句。如果存在该子句,则必须与输出参数隐含的结果类型一致。

column_name

RETURNS TABLE语法中,输出列的名称。

声明命名OUT参数的另一种方式,不同之处在于RETURNS TABLE还隐含RETURNS SETOF(即返回集合)。

column_type

RETURNS TABLE语法中,输出列的数据类型。

空值处理策略

CALLED ON NULL INPUT

默认行为,允许输入含NULL,函数需自行处理空值逻辑。

RETURNS NULL ON NULL INPUT

任意输入为NULL时直接返回NULL。

STRICT

RETURNS NULL ON NULL INPUT的别名。

函数版本控制

qualifier

指定调用的函数版本或别名,默认值为LATEST

请求压缩算法

compression

指定调用函数的RequestResponse中所用的压缩算法,有效值为:

  • None:禁用压缩(默认)。

  • GZIP:启用GZIP压缩。

单次请求最大批处理行数

max_batch_size

用于指定每批次发送至FC的最大行数。

核心目的是为存在内存限制或其他约束的FC函数设置批量处理的行数上限。若未显式指定此参数,Hologres将自动计算并采用最优批量大小,通常无需手动干预。

使用示例

  • 创建标量UDF。

    CREATE OR REPLACE FUNCTION rf_add (  
      a INTEGER,  
      b INTEGER DEFAULT 1 
    )  
    RETURNS BIGINT  
    LANGUAGE function_compute  
    AS 'xxxxxxxxxxxxx.cn-shanghai-internal.fc.aliyuncs.com/add'  
    ;
  • 创建UDTF。

    -- 形式 1
    CREATE OR REPLACE FUNCTION rf_unnest(TEXT [])
    RETURNS TABLE (item TEXT )
    LANGUAGE function_compute
    AS 'xxxxxxxxxxxxxx.cn-hangzhou-internal.fc.aliyuncs.com/unnest'
    STRICT
    SET function_compute.max_batch_size TO 1024;
    
    -- 形式 2
    CREATE OR REPLACE FUNCTION rf_unnest(TEXT []) 
    RETURNS SETOF TEXT 
    LANGUAGE function_compute 
    AS 'xxxxxxxxxxxxxx.cn-hangzhou-internal.fc.aliyuncs.com/unnest' 
    STRICT
    SET function_compute.max_batch_size TO 1024;

查看函数

查看已创建的Remote Function。

SELECT
  CASE
    WHEN p.proretset = 'f' THEN 'scalar UDF'
    WHEN p.proretset = 't' THEN 'UDTF'
  END AS function_type,
  n.nspname AS schema_name,
  p.proname AS function_name,
  pg_get_function_arguments(p.oid) AS arguments,
  pg_get_function_result(p.oid) AS return_type,
  p.proisstrict AS is_strict,
  p.proconfig AS config,
  pg_get_functiondef(p.oid) AS definition
FROM
  pg_proc p
  JOIN pg_language l ON p.prolang = l.oid
  JOIN pg_namespace n ON p.pronamespace = n.oid
WHERE
  l.lanname = 'function_compute'
  AND p.prokind != 'p'
ORDER BY
  function_type,
  schema_name,
  function_name;

删除函数

语法

 DROP FUNCTION [ IF EXISTS ] <function_name> [ ( [ [ argmode ] [ argname ] <argtype> [, ...] ] ) ] [, ...]

参数说明

参数

是否必填

说明

function_name

要删除的函数名。

argtype

参数数据类型。

argmode

支持IN,OUTINOUT,默认为IN,为避免歧义不建议使用INOUT。另外,只能包含一个OUT或者1INOUT的参数。

argname

参数名称。对于入参仅作为文档说明,对于出参,决定返回结果集的列名,若省略则由系统生成默认名称。

使用示例

DROP FUNCTION rf_add(INTEGER, INTEGER);

数据交互格式

Hologres发送到FC的请求格式

Hologres通过POST调用FCInvokeFunction接口。请求体为JSON格式,其顶层为对象。目前仅包含名为data键,其值为二维数组,其中的每个元素对应一批次中的一行数据,其中包含调用函数所需的参数。

数据的序列化规则

  • BOOLEAN序列化为JSON BOOLEAN类型。

  • INTEGERBIGINT序列化为JSON NUMBER类型。

  • REALDOUBLE PRECISION序列化为JSON NUMBER类型。

  • TEXT序列化为JSON STRING类型。

  • NULL序列化为JSONNULL

示例

以下是一个针对具有签名rf_demo(TEXT, INTEGER, BOOLEAN)Remote Function的序列化请求示例。

{  
  "data": [  
    ["foo", 100, true],
    [null, null, false],
    ["bar", 200, false]
  ]
}  

Hologres接收到FC的响应格式

FC完成对某批次数据的处理时,需按照以下规范,以JSON的形式将结果数据返回给Hologres:

  • 标量函数Scalar UDF。

    • 顶层必须包含results字段,其值为一个数组,每个元素对应输入数据的一行结果。

    • 每行结果必须为一个数组,且严格遵循输入数据的顺序(即第N个元素对应输入数据的第N行)。

    以下是一个大小为4的批次的返回数据示例:

    {
      "results": [
        ["北京"],
        ["上海"],
        ["深圳"],
        ["广州"]
      ]
    }
  • UDTF:UTDF支持单行输入生成多行输出,需通过行号(row_num)标识原始输入行的关联关系。

    • 顶层必须包含results字段,其值为一个数组,每个元素对应输出的一行数据。

    • 每行结果必须是包含两个元素的数组:

      • row_num(首元素):原始输入数据的行号(0起始索引),用于关联输入与输出。row_num不可乱序,必须以递增的形式返回。

      • result(次元素):处理后的一行返回值。

    示例如下:

    {
      "results": [
        [0, "北京"],
        [1, "上海"],
        [3, "深圳"],
        [3, "广州"],
      ]
    }

自定义函数使用示例

本示例以unnest函数为例。

  1. 开通函数计算。

    登录函数计算控制台。您还可以根据界面提示领取一定额度的免费资源包。详情请参见试用额度

  2. 创建FC事件函数。

    1. 单击左侧导航栏函数,切换地域,保持与Hologres实例在同一地域。

    2. 函数页面,单击创建函数,进入创建函数页面。

    3. 选择事件函数,配置如下参数,其他参数保持不变。详情请参见创建事件函数

      参数

      说明

      函数名称

      自定义设置。例如unnest

      运行环境

      选择内置运行时/Python/Python 3.10

      代码上传方式

      选择通过ZIP包上传代码

      代码包

      上传已编写并打包好的代码包。

      将如下代码保存至unnest.py,并压缩成unnest.zip。

      import json
      
      def unnest(event, context):
          evt = json.loads(event)
          data = evt.get('data', None)
          if data is None:
              raise ValueError('no "data" key in event.')
          if not isinstance(data, list):
              raise ValueError('data is not a list.')
      
          res = list()
          for i in range(len(data)):
              if len(data[i]) != 1 or not isinstance(data[i], list):
                  raise ValueError('the item in data is not a list.')
              for item in data[i][0]:
                  res.append([i, item])
      
          return json.dumps({'results': res})

      请求处理程序

      填写unnest.unnest

  3. 创建Hologres Remote Function。

    说明

    您需要在HoloWeb平台完成以下步骤,具体操作请参见连接HoloWeb

    CREATE EXTENSION IF NOT EXISTS function_compute;
    
    CREATE OR REPLACE FUNCTION rf_unnest(INTEGER [])
      RETURNS SETOF INTEGER
    STRICT
    LANGUAGE function_compute
    AS 'xxxxxxxxxxxxxxxxx.cn-shanghai-internal.fc.aliyuncs.com/unnest';
  4. 准备测试数据。

    CREATE TABLE test_array (
        numbers INTEGER[]
    );
    
    INSERT INTO test_array (numbers) VALUES
      (ARRAY[1, 3]),
      (ARRAY[2, 4]),
      ('{}'),  
      (ARRAY[]::INTEGER[]),  
      (NULL);
  5. 调用Remote Function。

    SELECT numbers, rf_unnest(numbers) FROM test_array;
    
     numbers | rf_unnest
    ---------+-----------
     {2,4}   |         2
     {2,4}   |         4
     {1,3}   |         1
     {1,3}   |         3
    (4 rows)