自定义函数UDF
AnalyticDB for MySQL支持使用REMOTE_CALL函数远程调用函数计算服务(Function Compute,简称FC)中自定义的函数,以满足您在AnalyticDB for MySQL中使用UDF(用户自定义函数)的需求。
前提条件
功能介绍
AnalyticDB for MySQL Remote UDF功能将函数计算服务作为远端的函数运行服务器,在AnalyticDB for MySQL中使用REMOTE_CALL函数远程调用预先在函数计算服务中自定义的函数,完成函数计算。流程图如下:
客户端提交SQL至AnalyticDB for MySQL。
AnalyticDB for MySQL调用REMOTE_CALL函数后,会将数据以JSON格式发送到函数计算服务。
在函数计算服务中根据自定义的函数计算数据。
函数计算服务将计算结果以JSON的形式返回到AnalyticDB for MySQL。
AnalyticDB for MySQL计算再将最终的计算结果返回至客户端。
注意事项
Remote UDF功能仅支持标量UDF函数。
语法
remote_call('returnType', 'func_name', ['{external_config}'|NULL], X1, X2, ..., Xn)
参数说明
参数 | 说明 |
returnType | 返回值的数据类型。仅支持BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP和DATETIME数据类型。 |
func_name | 在函数计算服务中创建的自定义函数名称。
说明 您可以登录函数计算控制台,查看函数计算服务的版本。 |
external_config|null | 函数调用中的扩展参数,若无,则填写null。扩展参数的格式为JSON,多个参数之间用半角逗号(,)分隔。 您可以在查询中设置扩展参数,或通过 |
X1.......Xn | 输入的参数。仅支持BOOLEAN、DOUBLE、VARCHAR、INTEGER、TINYINT、BIGINT、TIME、DATE、TIMESTAMP和DATETIME数据类型。 |
external_config支持的扩展参数如下:
全局参数 | 查询级别参数 | 是否必填 | 说明 |
XIHE_REMOTE_CALL_SERVER_ENDPOINT | endpoint | 是 | 函数计算服务的内网服务接入地址。详细信息,请参见服务接入地址。 |
XIHE_REMOTE_CALL_SERVER_AK | - | 条件必填 | 阿里云账号或者具备函数计算服务管理权限的RAM用户的AccessKey ID。
如何获取AccessKey ID,请参见账号与权限。 重要 XIHE_REMOTE_CALL_SERVER_AK参数仅支持全局配置。 |
XIHE_REMOTE_CALL_SERVER_SK | - | 条件必填 | 阿里云账号或者具备函数计算服务管理权限的RAM用户的AccessKey Secret。
如何获取AccessKey Secret,请参见账号与权限。 重要 XIHE_REMOTE_CALL_SERVER_SK参数仅支持全局配置。 |
XIHE_REMOTE_CALL_COMPRESS_ENABLED | compressed | 否 | 在AnalyticDB for MySQL提交请求后,是否使用GZIP格式压缩数据再传输至函数计算服务。取值:
|
XIHE_REMOTE_CALL_MAX_BATCH_SIZE | max_batch_size | 否 | 调用REMOTE_CALL函数之后,每批次向函数计算服务中发送的数据最大行数。默认无限制。数据行数越小,函数计算服务中CPU和内存的压力越小,但查询的执行时间会变长。 |
示例
函数计算服务中编写自定义函数示例
AnalyticDB for MySQL调用REMOTE_CALL函数后,会将数据以JSON格式发送到函数计算服务;经过计算后,函数计算服务会将计算结果以JSON形式返回。您需要根据数据发送的格式和计算结果返回的格式,在函数计算服务中编写自定义函数。
本文示例函数计算服务中创建的自定义函数名称为ConcactNumberWithCompress,示例代码如下:
public class App
implements StreamRequestHandler, FunctionInitializer
{
public static final Logger log = Logger.getLogger(App.class.getName());
public void initialize(Context context)
throws IOException
{
// TODO
}
@Override
public void handleRequest(
InputStream inputStream, OutputStream outputStream, Context context)
throws IOException
{
InputStream notCompressedInputStream1;
notCompressedInputStream1 = tryUnCompress(inputStream);
JSONObject response = new JSONObject();
try {
JSONObject requestJson = JSONObject.parseObject(IOUtils.toString(notCompressedInputStream1));
if (requestJson.containsKey("data")) {
JSONArray result = new JSONArray();
JSONArray data = requestJson.getJSONArray("data");
for (int i = 0; i < data.size(); i++) {
JSONArray row = data.getJSONArray(i);
// 从这里开始实现函数计算真正的业务逻辑。
// 假设这里的函数有两个入参a和b,则AnalyticDB for MySQL发来的数据每行都会有两个数,第一个数是a,第二个数是b。
if (row.size() == 2) {
result.add(testFunc(row.getInteger(0), row.getInteger(1)));
}
else {
throw new RuntimeException("row size is not 2");
}
// 函数计算逻辑结束。
}
response.put("result", result);
response.put("success", true);
response.put("message", "");
}
else {
response.put("success", false);
response.put("message", "no data inside");
}
}
catch (Exception e) {
log.info("error happened" + e.getMessage());
response.put("success", false);
response.put("message", e.getMessage());
}
outputStream.write(tryCompress(response.toJSONString().getBytes()));
}
private String testFunc(int a, int b)
{
// 测试函数将入参a、b和1使用&符号连接。
return String.valueOf(a) + '&' + b + '&' + 1;
}
public static byte[] tryCompress(byte[] bytes)
{
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
try {
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteOutputStream);
gzipOutputStream.write(bytes);
gzipOutputStream.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
return byteOutputStream.toByteArray();
}
public static InputStream tryUnCompress(InputStream inputStream)
throws IOException
{
GZIPInputStream gzipInputStream;
gzipInputStream = new GZIPInputStream(inputStream);
return gzipInputStream;
}
}
AnalyticDB for MySQL调用REMOTE_CALL函数示例
在未设置全局配置的情况下,调用函数计算服务中创建的自定义函数ConcactNumberWithCompress,将
1
和2
使用&
连接:SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com"}',1,2);
返回结果:
1&2&1
在未设置全局配置的情况下,调用函数计算服务中创建的自定义函数ConcactNumberWithCompress,将
3
和4
使用&
连接:SELECT remote_call('varchar','ConcactNumberWithCompress','{endpoint:"1234567890000****.cn-zhangjiakou-internal.fc.aliyuncs.com",compressed:false,max_batch_size:5000000}',3,4);
返回结果:
3&4&1
在设置全局配置的情况下,调用函数计算服务中创建的自定义函数ConcactNumberWithCompress,将
5
和6
使用&
连接:SELECT remote_call('varchar','ConcactNumberWithCompress',null,5,6);
返回结果:
5&6&1
常见问题
调用REMOTE_CALL函数时,出现 java.util.zip.ZipException: Not in GZIP format
报错的原因是什么?
AnalyticDB for MySQL将数据使用GZIP格式压缩后传输至函数计算服务,但在函数计算服务创建的自定义函数中没有编写解压缩代码,导致函数计算服务无法解析数据。
AnalyticDB for MySQL未开启压缩,将数据直接传输至函数计算服务,函数计算服务经过计算后,将计算结果使用GZIP格式压缩后返回给AnalyticDB for MySQL,导致AnalyticDB for MySQL无法解析数据。
调用REMOTE_CALL函数时,出现parse remote_call config error
报错的原因是什么?
REMOTE_CALL函数中external_config参数的语法错误,请您检查并修改SQL语句,再重新执行。