当流引擎的内建函数无法满足业务需求时,您可以开发自定义函数。本文介绍Lindorm流引擎支持的用户自定义函数类型和开发流程。
前提条件
- 已安装Java环境,JDK版本为1.8及以上。
- 云原生多模数据库Lindorm实例已开通文件引擎和流引擎,具体操作请参见开通文件引擎和开通流引擎。
- 已将客户端IP地址添加至Lindorm实例的白名单中,具体操作请参见设置白名单。
支持的函数类型
用户自定义函数是用户自定义标量函数(UDF)、用户自定义表值函数(UDTF)和用户自定义聚合函数(UDAF)三种类型的自定义函数的集合。Lindorm流引擎支持的用户自定义函数类型如下表:
自定义函数类型 |
名称 |
应用场景 |
UDF |
用户自定义标量函数(User Defined Scalar Function)。 |
适用于一进一出业务场景。即其输入与输出是一对一的关系,查询一行数据,输出一个值。 |
UDTF |
用户自定义表值函数(User Defined Table Valued Function),又称表格UDF。 |
适用于一进多出业务场景。即其输入与输出是一对多的关系,查询一行数据,输出多个值可视为一张表。 |
使用限制
- 使用用户自定义函数时不支持外网访问。
- 仅支持使用Java语言开发用户自定义函数。
注意事项
- 调用用户自定义函数的性能低于内建函数的性能,建议您优先使用内建函数完成相同逻辑的业务需求。
- 在SQL语句中使用用户自定义函数时,需要注意内存的占用情况,如果内存占用太多可能会造成服务端内存溢出的问题。
- 用户自定义函数的名称与内建函数的名称不能重名。
- 开发用户自定义函数支持的数据类型包括:STRING、INT32、INT64、FLOAT64。
开发流程
以IntelliJ IDEA软件开发自定义函数为例。IntelliJ IDEA软件下载地址为官网IDEA。
- 创建UDF工程。
- 下载Lindorm流引擎支持的用户自定义函数和工程示例(udf和udf-examples)。
- 将udf和udf-examples导入至IntelliJ IDEA中。
- 在IDEA软件顶部菜单栏,选择。
- 在左侧导航栏,选择。
- 切换至Dependencies,选择页面右边的。
- 选择udf和udf-examples的目录,单击OK。
- 执行以下命令将udf安装到本地。
mvn clean -DskipTests=true install
- 修改udf-examples工程名为自定义的工程名,工程名为导出JAR包的名称。
- 编写Java代码。
- 开发UDF类型的用户自定义函数的完整代码示例如下。函数名为
ld_md5
,函数功能是将输入的String计算MD5值并返回计算后的String。
说明 示例代码包括以下内容:
- 创建一个继承KUDF的类Md5Udf。
- 实现
evaluate()
方法。
- 配置UdfAnnotation:包括UDF的函数名和描述信息。
- 配置UdfFunction:包括UDF的参数类型和返回值类型。
package com.alibaba.lindorm.stream.engine.function.udf;
import com.alibaba.lindorm.stream.engine.function.UdfFunction;
import com.alibaba.lindorm.stream.engine.function.UdfUtils;
/**
* Get md5sum of string
*/
@UdfAnnotation(name = "ld_md5", description = "example udf")
public class Md5Udf implements Kudf {
@Override public void init() {
}
@UdfFunction(returnType = "string", arguments = "string")
@Override public Object evaluate(Object... args) {
if (args.length != 1) {
throw new IllegalArgumentException("Md5 udf should have one input argument.");
}
return UdfUtils.getMD5Str((String) args[0]);
}
}
- 开发UDTF类型的用户自定义函数的完整代码示例如下。
说明 示例代码包括以下内容:
- 创建一个继承KUDF的类StringSplitUdtf。
- 实现
evaluate()
方法。
- 配置UdtfAnnotation:包括UDTF的函数名和描述信息。
- 配置UdfFunction:包括UDF的参数类型和返回值类型。
- 函数名为
LD_SPLIT_STRING
,函数功能是返回单个列的值,代码示例如下:package com.alibaba.lindorm.stream.engine.function.udtf;
import com.alibaba.lindorm.stream.engine.function.UdfFunction;
import com.alibaba.lindorm.stream.engine.function.udf.Kudf;
import org.codehaus.jackson.map.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* udtf example
*/
@UdtfAnnotation(name = "LD_SPLIT_STRING", description = "example udtf")
public class StringSplitUdtf implements Kudf {
@Override public void init() {
}
@UdfFunction( returnType = "string", arguments = "string,string")
@Override public List<?> evaluate(Object... args) {
if (args.length != 2) {
throw new IllegalArgumentException("StringSplitUdtf should have two input arguments. current got " + args);
}
String orgString = (String) args[0];
String splitter = (String) args[1];
return Arrays.asList(orgString.split(splitter));
}
}
- 函数名为
LD_ARRAY_EXPAND
,函数功能是返回多个列的值,代码示例如下:
说明 如果开发用户自定义函数需要返回多个列,请定义以下参数:
returnType
需要定义每一列的数据类型,例如:@UdfFunction( returnType = "string,int64,string,int64", arguments = "string")
。
- 返回值需要为
List<LIst<Object>>
。
import com.alibaba.lindorm.stream.engine.function.UdfFunction;
import com.alibaba.lindorm.stream.engine.function.udf.Kudf;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonArrayFormatVisitor;
import java.util.Arrays;
import java.util.List;
/**
* udtf example
*/
@UdtfAnnotation(name = "LD_ARRAY_EXPAND", description = "example multi col udtf")
public class ArrayExpandUdtf implements Kudf {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override public void init() {
}
@UdfFunction( returnType = "string,int64,string,int64", arguments = "string")
@Override public List<?> evaluate(Object... args) {
if (args.length != 1) {
throw new IllegalArgumentException("ArrayExpandUdtf should have ont input arguments. current got " + args);
}
String inString = (String) args[0];
try {
String[] values = OBJECT_MAPPER.readValue(inString, String[].class);
return Arrays
.asList(Arrays.asList(values[0], Integer.parseInt(values[1]), values[2], Integer.parseInt(values[3])));
} catch (JsonProcessingException e) {
throw new IllegalStateException("udf exception ", e);
}
}
}
- 测试Java代码。在本地调试用户自定义函数,查看运行结果是否符合预期。
- 运行成功后执行以下命令将Java代码打包为JAR包。
mvn clean -DskipTests=true install
- 执行以下命令将JAR包上传至文件引擎的UDF目录(即/lindormstream/udf/),更多请参见通过HDFS Shell连接并使用文件引擎。
${HADOOP_HOME}/bin/hadoop fs -put <JAR包名称> hdfs://<实例ID>/lindormstream/udf/
参数说明如下:
- JAR包名称:打包后的JAR包名称。
- 实例ID:Lindorm实例ID。
- 查看用户自定义函数的信息。
通过Lindorm-cli连接Lindorm流引擎后执行以下命令,连接方法请参见
通过Lindorm-cli连接并使用Lindorm流引擎。
SHOW functions;
返回信息中可以查看Function名称为
ld_md5
、
LD_SPLIT_STRING
和
LD_ARRAY_EXPAND
的函数。
- 在SQL中调用自定义函数。
- 调用UDF类型的用户自定义函数。
CREATE CQ md5_user_table INSERT INTO user_table AS
SELECT ld_md5(userid) AS id, name, age, tags FROM user_stream;
- 调用UDTF类型的用户自定义函数。
- 使用返回单列的用户自定义函数。
CREATE CQ splittag_user_table INSERT INTO user_tag_table AS
SELECT userid, LD_SPLIT_STRING(tags , ",") AS tag FROM user_stream;
- 使用返回多列的用户自定义函数。
CREATE CQ splittag_user_table INSERT INTO user_tag_table AS
SELECT userid, LD_ARRAY_EXPAND(tags) AS `T(tag1, tag2, tag3, tag4)`
FROM user_stream;