本文为您介绍如何编写和使用UDF。
背景信息
自2.2.0版本起,StarRocks支持使用Java语言编写用户定义函数(User Defined Function,简称UDF)。
自3.0版本起,StarRocks支持Global UDF,您只需要在相关的SQL语句(CREATE/SHOW/DROP)中加上GLOBAL关键字,该语句即可全局生效,无需逐个为每个数据库执行此语句。您可以根据业务场景开发自定义函数,扩展StarRocks的函数能力。
目前StarRocks支持的UDF包括:
- 用户自定义标量函数(Scalar UDF) 
- 用户自定义聚合函数(User Defined Aggregation Function,UDAF) 
- 用户自定义窗口函数(User Defined Window Function,UDWF) 
- 用户自定义表格函数(User Defined Table Function,UDTF) 
前提条件
使用StarRocks的Java UDF功能前,您需要:
- 安装Apache Maven以创建并编写相关Java项目。 
- 在服务器上安装JDK 1.8。 
- 开启UDF功能。在实例配置页面,设置FE配置项 - enable_udf为- TRUE,并重启实例使配置项生效。
类型映射关系
| SQL TYPE | Java TYPE | 
| BOOLEAN | java.lang.Boolean | 
| TINYINT | java.lang.Byte | 
| SMALLINT | java.lang.Short | 
| INT | java.lang.Integer | 
| BIGINT | java.lang.Long | 
| FLOAT | java.lang.Float | 
| DOUBLE | java.lang.Double | 
| STRING/VARCHAR | java.lang.String | 
开发并使用UDF
您需要创建Maven项目并使用Java语言编写相应功能。
步骤一:创建Maven项目
创建Maven项目,项目的基本目录结构如下。
project
|--pom.xml
|--src
|  |--main
|  |  |--java
|  |  |--resources
|  |--test
|--target步骤二:添加依赖
在pom.xml中添加如下依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>udf</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.10</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>步骤三:开发UDF
您需要使用Java语言开发相应UDF。
开发Scalar UDF
Scalar UDF,即用户自定义标量函数,可以对单行数据进行操作,输出单行结果。当您在查询时使用Scalar UDF,每行数据最终都会按行出现在结果集中。典型的标量函数包括UPPER、LOWER、ROUND、ABS。
以下示例以提取JSON数据功能为例进行说明。例如,业务场景中,JSON数据中某个字段的值可能是JSON字符串而不是JSON对象,因此在提取JSON字符串时,SQL语句需要嵌套调用GET_JSON_STRING,即GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")。
为简化SQL语句,您可以开发一个UDF,直接提取JSON字符串,例如:MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")。
package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;
public class UDFJsonGet {
    public final String evaluate(String jsonObj, String key) {
        if (obj == null || key == null) return null;
        try {
            // JSONPath库可以全部展开,即使某个字段的值是JSON格式的字符串
            return JSONPath.read(jsonObj, key).toString();
        } catch (Exception e) {
            return null;
        }
    }
}用户自定义类必须实现如下方法。
方法中请求参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
| 方法 | 含义 | 
| TYPE1 evaluate(TYPE2, ...) | 
 | 
开发UDAF
UDAF,即用户自定义的聚合函数,对多行数据进行操作,输出单行结果。典型的聚合函数包括SUM、COUNT、MAX、MIN,这些函数对于每个GROUP BY分组中多行数据进行聚合后,只输出一行结果。
以下示例以MY_SUM_INT函数为例进行说明。与内置函数SUM(返回值为BIGINT类型)区别在于,MY_SUM_INT函数支持传入参数和返回参数的类型为INT。
package com.starrocks.udf.sample;
public class SumInt {
    public static class State {
        int counter = 0;
        public int serializeLength() { return 4; }
    }
    public State create() {
        return new State();
    }
    public void destroy(State state) {
    }
    public final void update(State state, Integer val) {
        if (val != null) {
            state.counter+= val;
        }
    }
    public void serialize(State state, java.nio.ByteBuffer buff) {
        buff.putInt(state.counter);
    }
    public void merge(State state, java.nio.ByteBuffer buffer) {
        int val = buffer.getInt();
        state.counter += val;
    }
    public Integer finalize(State state) {
        return state.counter;
    }
}用户自定义类必须实现如下方法。
方法中传入参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
| 方法 | 含义 | 
| State create() | 创建State。 | 
| void destroy(State) | 销毁State。 | 
| void update(State, ...) | 更新State。其中第一个参数是State,其余的参数是函数声明的输入参数,可以为1个或多个。 | 
| void serialize(State, ByteBuffer) | 序列化State。 | 
| void merge(State, ByteBuffer) | 合并State和反序列化State。 | 
| TYPE finalize(State) | 通过State获取函数的最终结果。 | 
并且,开发UDAF函数时,您需要使用缓冲区类java.nio.ByteBuffer和局部变量serializeLength,用于保存和表示中间结果,指定中间结果的序列化长度。
| 类和局部变量 | 说明 | 
| java.nio.ByteBuffer() | 缓冲区类,用于保存中间结果。由于中间结果在不同执行节点间传输时,会进行序列化和反序列化,因此还需要使用serializeLength指定中间结果序列化后的长度。 | 
| serializeLength() | 中间结果序列化后的长度,单位为Byte。serializeLength的数据类型固定为INT。例如,示例中 | 
java.nio.ByteBuffer序列化相关事项:
- 不支持依赖ByteBuffer的remaining()方法来反序列化State。 
- 不支持对ByteBuffer调用clear()方法。 
- serializeLength需要与实际写入数据的长度保持一致,否则序列化和反序列化过程中会造成结果错误。
开发UDWF
UDWF,即用户自定义窗口函数。跟普通聚合函数不同的是,窗口函数针对一组行(一个窗口)计算值,并为每行返回一个结果。一般情况下,窗口函数包含OVER子句,将数据行拆分成多个分组,窗口函数基于每一行数据所在的组(一个窗口)进行计算,并为每行返回一个结果。
以下示例以MY_WINDOW_SUM_INT函数为例进行说明。与内置函数SUM(返回类型为BIGINT)区别在于,MY_WINDOW_SUM_INT函数支持传入参数和返回参数的类型为INT。
package com.starrocks.udf.sample;
public class WindowSumInt {    
    public static class State {
        int counter = 0;
        public int serializeLength() { return 4; }
        @Override
        public String toString() {
            return "State{" +
                    "counter=" + counter +
                    '}';
        }
    }
    public State create() {
        return new State();
    }
    public void destroy(State state) {
    }
    public void update(State state, Integer val) {
        if (val != null) {
            state.counter+=val;
        }
    }
    public void serialize(State state, java.nio.ByteBuffer buff) {
        buff.putInt(state.counter);
    }
    public void merge(State state, java.nio.ByteBuffer buffer) {
        int val = buffer.getInt();
        state.counter += val;
    }
    public Integer finalize(State state) {
        return state.counter;
    }
    public void reset(State state) {
        state.counter = 0;
    }
    public void windowUpdate(State state,
                            int peer_group_start, int peer_group_end,
                            int frame_start, int frame_end,
                            Integer[] inputs) {
        for (int i = (int)frame_start; i < (int)frame_end; ++i) {
            state.counter += inputs[i];
        }
    }
}用户自定义类必须实现UDAF所需要的方法(窗口函数是特殊聚合函数)以及windowUpdate()方法。
方法中请求参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
需要额外实现的方法
| 方法 | 含义 | 
| 
 | 更新窗口数据。窗口函数的详细说明,请参见窗口函数。输入每一行数据,都会获取到对应窗口信息来更新中间结果。 
 | 
开发UDTF
UDTF,即用户自定义表值函数,读入一行数据,输出多个值可视为一张表。表值函数常用于实现行转列。
目前UDTF只支持返回多行单列。
以下示例以MY_UDF_SPLIT函数为例进行说明。MY_UDF_SPLIT函数支持分隔符为空格,传入参数和返回参数的类型为STRING。
package com.starrocks.udf.sample;
public class UDFSplit{
    public String[] process(String in) {
        if (in == null) return null;
        return in.split(" ");
    }
}用户自定义类必须实现如下方法。
方法中请求参数和返回参数的数据类型,需要和步骤六中的CREATE FUNCTION语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
| 方法 | 含义 | 
| TYPE[] process() | 
 | 
步骤四:打包Java项目
通过以下命令打包Java项目。
mvn packagetarget目录下会生成两个文件:udf-1.0-SNAPSHOT.jar和udf-1.0-SNAPSHOT-jar-with-dependencies.jar。
步骤五:上传项目
将文件udf-1.0-SNAPSHOT-jar-with-dependencies.jar上传到OSS上,并开放JAR包的公共读权限。详情请参见简单上传、设置Bucket ACL。
步骤六中,FE会对UDF所在JAR包进行校验并计算校验值,BE会下载UDF所在JAR包并执行。
步骤六:在StarRocks中创建UDF
StarRocks内提供了两种Namespace的UDF:一种是Database级Namespace,一种是Global级Namespace。
- 如果您没有特殊的UDF可见性隔离需求,您可以直接选择创建Global UDF。在引用Global UDF时,直接调用Function Name即可,无需任何Catalog和Database作为前缀,访问更加便捷。 
- 如果您有特殊的UDF可见性隔离需求,或者需要在不同Database下创建同名UDF,那么你可以选择在Database内创建UDF。此时,如果您的会话在某个Database内,您可以直接调用Function Name即可;如果您的会话在其他Catalog和Database下,那么您需要带上Catalog和Database前缀,例如: - catalog.database.function。
创建Global UDF需要有System级的CREATE GLOBAL FUNCTION权限;创建数据库级别的UDF需要有数据库级的CREATE FUNCTION权限;使用UDF需要有对应UDF的USAGE权限。关于如何赋权,参见GRANT。
JAR包上传完成后,您需要在StarRocks中,按需创建相应的UDF。如果创建Global UDF,只需要在SQL语句中带上GLOBAL关键字即可。
语法
CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]参数说明
| 参数 | 必选 | 说明 | 
| GLOBAL | 否 | 如需创建全局UDF,需指定该关键字。从3.0版本开始支持。 | 
| AGGREGATE | 否 | 如要创建UDAF和UDWF,需指定该关键字。 | 
| TABLE | 否 | 如要创建UDTF,需指定该关键字。 | 
| function_name | 是 | 函数名,可以包含数据库名称,比如, | 
| arg_type | 是 | 函数的参数类型。具体支持的数据类型,请参见类型映射关系。 | 
| return_type | 是 | 函数的返回值类型。具体支持的数据类型,请参见类型映射关系。 | 
| properties | 是 | 函数相关属性。创建不同类型的UDF需配置不同的属性,详情和示例请参考以下示例。 | 
创建Scalar UDF
执行如下命令,在StarRocks中创建之前示例中的Scalar UDF。
CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string) 
RETURNS string
PROPERTIES (
    "symbol" = "com.starrocks.udf.sample.UDFJsonGet", 
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);| 参数 | 描述 | 
| symbol | UDF所在项目的类名。格式为 | 
| type | 用于标记所创建的UDF类型。取值为 | 
| file | UDF所在JAR包的HTTP路径,配置成OSS包含对应内网Endpoint的HTTP URL。格式为 | 
创建UDAF
执行如下命令,在StarRocks中创建之前示例中的UDAF。
CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT) 
RETURNS INT
PROPERTIES 
( 
    "symbol" = "com.starrocks.udf.sample.SumInt", 
    "type" = "StarrocksJar",
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);PROPERTIES里的参数说明与创建Scalar UDF相同。
创建UDWF
执行如下命令,在StarRocks中创建先前示例中的UDWF。
CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
PROPERTIES 
(
    "analytic" = "true",
    "symbol" = "com.starrocks.udf.sample.WindowSumInt", 
    "type" = "StarrocksJar", 
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);analytic:所创建的函数是否为窗口函数,固定取值为true。其他参数说明与创建Scalar UDF相同。
创建UDTF
执行如下命令,在StarRocks中创建先前示例中的UDTF。
CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
PROPERTIES 
(
    "symbol" = "com.starrocks.udf.sample.UDFSplit", 
    "type" = "StarrocksJar", 
    "file" = "http://<YourBucketName>.oss-cn-xxxx-internal.aliyuncs.com/<YourPath>/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);PROPERTIES里的参数说明与创建Scalar UDF相同。
步骤七:使用UDF
创建完成后,您可以测试使用您开发的UDF。
使用Scalar UDF
执行如下命令,使用步骤六创建的Scalar UDF函数。
SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');使用UDAF
执行如下命令,使用步骤六创建的UDAF函数。
SELECT MY_SUM_INT(col1);使用UDWF
执行如下命令,使用步骤六创建的UDWF函数。
SELECT MY_WINDOW_SUM_INT(intcol) 
            OVER (PARTITION BY intcol2
                  ORDER BY intcol3
                  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;使用UDTF
执行如下命令,使用先前示例中的UDTF。
-- 假设存在表 t1,其列 a、b、c1 信息如下。
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."
-- 使用 MY_UDF_SPLIT() 函数。
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1); 
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."- 第一个 - MY_UDF_SPLIT为调用- MY_UDF_SPLIT后生成的列别名。
- 暂不支持使用 - AS t2(f1)的方式指定表格函数返回表的表别名和列别名。
查看UDF信息
运行以下命令查看UDF信息。
SHOW [GLOBAL] FUNCTIONS;删除UDF
运行以下命令删除指定的UDF。
DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);FAQ
Q:开发UDF时是否可以使用静态变量?不同UDF间的静态变量间否会互相影响?
A:支持在开发UDF时使用静态变量,且不同UDF间(即使类同名),静态变量是互相隔离的,不会互相影响。