用户自定义函数
当流引擎的内建函数无法满足业务需求时,您可以开发自定义函数。本文介绍Lindorm流引擎支持的用户自定义函数类型和开发流程。
前提条件
支持的函数类型
用户自定义函数是用户自定义标量函数(UDF)、用户自定义表值函数(UDTF)和用户自定义聚合函数(UDAF)三种类型的自定义函数的集合。Lindorm流引擎支持的用户自定义函数类型如下表:
自定义函数类型 | 名称 | 应用场景 |
UDF | 用户自定义标量函数(User Defined Scalar Function)。 | 适用于一进一出业务场景。即其输入与输出是一对一的关系,查询一行数据,输出一个值。 |
UDTF | 用户自定义表值函数(User Defined Table Valued Function),又称表格UDF。 | 适用于一进多出业务场景。即其输入与输出是一对多的关系,查询一行数据,输出多个值可视为一张表。 |
使用限制
使用用户自定义函数时不支持外网访问。
仅支持使用Java语言开发用户自定义函数。
注意事项
调用用户自定义函数的性能低于内建函数的性能,建议您优先使用内建函数完成相同逻辑的业务需求。
在SQL语句中使用用户自定义函数时,需要注意内存的占用情况,如果内存占用太多可能会造成服务端内存溢出的问题。
用户自定义函数的名称与内建函数的名称不能重名。
开发用户自定义函数支持的数据类型包括:STRING、INT32、INT64、FLOAT64。
开发流程
以IntelliJ IDEA软件开发自定义函数为例。IntelliJ IDEA软件下载地址为官网IDEA。
创建UDF工程。
下载Lindorm流引擎支持的用户自定义函数和工程示例(udf和udf-examples)。
将udf和udf-examples导入至IntelliJ IDEA中。
在IDEA软件顶部菜单栏,选择
。在左侧导航栏,选择
。切换至Dependencies,选择页面右边的
。选择udf和udf-examples的目录,单击OK。
执行以下命令将udf安装到本地。
mvn clean -DskipTests=true install
修改udf-examples工程名为自定义的工程名,工程名为导出JAR包的名称。
编写Java代码。
开发UDF类型的用户自定义函数的完整代码示例如下。函数名为
ld_md5
,函数功能是将输入的String计算MD5值并返回计算后的String。说明示例代码包括以下内容:
创建一个继承KUDF的类Md5Udf。
实现
evaluate()
方法。配置UdfAnnotation:包括UDF的函数名和描述信息。
配置UdfFunction:包括UDF的参数类型和返回值类型。
package com.alibaba.lindorm.stream.engine.function.udf; import com.alibaba.lindorm.stream.engine.function.UdfFunction; import com.alibaba.lindorm.stream.engine.function.UdfUtils; /** * Get md5sum of string */ @UdfAnnotation(name = "ld_md5", description = "example udf") public class Md5Udf implements Kudf { @Override public void init() { } @UdfFunction(returnType = "string", arguments = "string") @Override public Object evaluate(Object... args) { if (args.length != 1) { throw new IllegalArgumentException("Md5 udf should have one input argument."); } return UdfUtils.getMD5Str((String) args[0]); } }
开发UDTF类型的用户自定义函数的完整代码示例如下。
说明示例代码包括以下内容:
创建一个继承KUDF的类StringSplitUdtf。
实现
evaluate()
方法。配置UdtfAnnotation:包括UDTF的函数名和描述信息。
配置UdfFunction:包括UDF的参数类型和返回值类型。
函数名为
LD_SPLIT_STRING
,函数功能是返回单个列的值,代码示例如下:package com.alibaba.lindorm.stream.engine.function.udtf; import com.alibaba.lindorm.stream.engine.function.UdfFunction; import com.alibaba.lindorm.stream.engine.function.udf.Kudf; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; /** * udtf example */ @UdtfAnnotation(name = "LD_SPLIT_STRING", description = "example udtf") public class StringSplitUdtf implements Kudf { @Override public void init() { } @UdfFunction( returnType = "string", arguments = "string,string") @Override public List<?> evaluate(Object... args) { if (args.length != 2) { throw new IllegalArgumentException("StringSplitUdtf should have two input arguments. current got " + args); } String orgString = (String) args[0]; String splitter = (String) args[1]; return Arrays.asList(orgString.split(splitter)); } }
函数名为
LD_ARRAY_EXPAND
,函数功能是返回多个列的值,代码示例如下:说明如果开发用户自定义函数需要返回多个列,请定义以下参数:
returnType
需要定义每一列的数据类型,例如:@UdfFunction( returnType = "string,int64,string,int64", arguments = "string")
。返回值需要为
List<LIst<Object>>
。
import com.alibaba.lindorm.stream.engine.function.UdfFunction; import com.alibaba.lindorm.stream.engine.function.udf.Kudf; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonArrayFormatVisitor; import java.util.Arrays; import java.util.List; /** * udtf example */ @UdtfAnnotation(name = "LD_ARRAY_EXPAND", description = "example multi col udtf") public class ArrayExpandUdtf implements Kudf { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Override public void init() { } @UdfFunction( returnType = "string,int64,string,int64", arguments = "string") @Override public List<?> evaluate(Object... args) { if (args.length != 1) { throw new IllegalArgumentException("ArrayExpandUdtf should have ont input arguments. current got " + args); } String inString = (String) args[0]; try { String[] values = OBJECT_MAPPER.readValue(inString, String[].class); return Arrays .asList(Arrays.asList(values[0], Integer.parseInt(values[1]), values[2], Integer.parseInt(values[3]))); } catch (JsonProcessingException e) { throw new IllegalStateException("udf exception ", e); } } }
测试Java代码。在本地调试用户自定义函数,查看运行结果是否符合预期。
运行成功后执行以下命令将Java代码打包为JAR包。
mvn clean -DskipTests=true install
执行以下命令将JAR包上传至文件引擎的UDF目录(即/lindormstream/udf/),更多请参见通过HDFS Shell连接并使用文件引擎。
${HADOOP_HOME}/bin/hadoop fs -put <JAR包名称> hdfs://<实例ID>/lindormstream/udf/
参数说明如下:
JAR包名称:打包后的JAR包名称。
实例ID:Lindorm实例ID。
查看用户自定义函数的信息。
通过Lindorm-cli连接Lindorm流引擎后执行以下命令,连接方法请参见通过Lindorm-cli连接并使用Lindorm流引擎(旧接口不推荐)。
SHOW functions;
返回信息中可以查看Function名称为
ld_md5
、LD_SPLIT_STRING
和LD_ARRAY_EXPAND
的函数。在SQL中调用自定义函数。
调用UDF类型的用户自定义函数。
CREATE CQ md5_user_table INSERT INTO user_table AS SELECT ld_md5(userid) AS id, name, age, tags FROM user_stream;
调用UDTF类型的用户自定义函数。
使用返回单列的用户自定义函数。
CREATE CQ splittag_user_table INSERT INTO user_tag_table AS SELECT userid, LD_SPLIT_STRING(tags , ",") AS tag FROM user_stream;
使用返回多列的用户自定义函数。
CREATE CQ splittag_user_table INSERT INTO user_tag_table AS SELECT userid, LD_ARRAY_EXPAND(tags) AS `T(tag1, tag2, tag3, tag4)` FROM user_stream;