Python
实时计算Flink版支持在Flink SQL作业中使用Python自定义函数,本文为您介绍Flink Python自定义函数的分类、Python依赖使用方法和调优方式。
自定义函数分类
分类 | 描述 |
UDSF(User Defined Scalar Function) | 用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见自定义标量函数(UDSF)。 |
UDAF(User Defined Aggregation Function) | 自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见自定义聚合函数(UDAF)。 |
UDTF(User Defined Table-valued Function) | 自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见自定义表值函数(UDTF)。 |
使用Python依赖
实时计算Flink版集群已预装了Pandas、NumPy和PyArrow等常用的Python包,您可以在Python作业开发页面,了解实时计算Flink版中已安装的第三方Python包列表。预装的Python包使用时需要在Python函数内部导入。示例如下。
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
import numpy as np
return np.percentile(values, percentile)
此外,您也可以在Python自定义函数中使用其他类型的第三方Python包。需要注意的是,如果使用了非预装的第三方Python包,在注册Python UDF时,需要将其作为依赖文件上传,详情请参见管理自定义函数(UDF)和使用Python依赖。
代码调试
您可以在Python自定义函数的代码实现中,通过Logging的方式,输出日志信息,方便问题定位,示例如下。
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + j
日志输出后,您可以在TaskManager的日志文件中查看日志,详情请参见查看运行日志。
性能调优
预先加载资源
预先加载资源可以在UDF初始化时提前加载资源,无需在每一次执行计算(即eval)时重新加载资源。例如,您可能只想加载一次大型深度学习模型,然后对模型多次运行批量预测。代码示例如下。
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
class Predict(ScalarFunction):
def open(self, function_context):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
关于如何上传Python数据文件,可以参考文档使用Python依赖。
使用Pandas库
除了普通Python自定义函数之外,实时计算Flink版也支持您使用Pandas自定义函数。对于Pandas自定义函数,输入数据的类型是Pandas中定义的数据结构,例如pandas.Series和pandas.DataFrame等,您可以在Pandas自定义函数中使用Pandas和NumPy等高性能的Python库,开发出高性能的Python自定义函数,详情请参见Vectorized User-defined Functions。
配置参数
Python自定义函数的性能在很大程度取决于Python自定义函数自身的实现,如果遇到性能问题,您需要尽可能优化Python自定义函数的实现。除此之外,Python自定义函数的性能也受以下参数取值的影响。
参数 | 说明 |
python.fn-execution.bundle.size | Python UDF的计算是异步的,在执行过程中,Java算子将数据异步发送给Python进程进行处理。Java算子在将数据发送给Python进程之前,会先将数据缓存起来,到达一定阈值之后,再发送给Python进程。python.fn-execution.bundle.size参数可用来控制可缓存的数据最大条数。 默认值为100000,单位是条数。 |
python.fn-execution.bundle.time | 用来控制数据的最大缓存时间。当缓存的数据条数到达python.fn-execution.bundle.size定义的阈值或缓存时间到达python.fn-execution.bundle.time定义的阈值时,会触发缓存数据的计算。 默认值为1000,单位是毫秒。 |
python.fn-execution.arrow.batch.size | 使用Pandas UDF时,一个arrow batch可容纳的数据最大条数,默认值为10000。 说明 python.fn-execution.arrow.batch.size参数值不能大于python.fn-execution.bundle.size参数值。 |
以上3个参数并不是配置的越大越好,当这些参数取值配置过大时,可能会导致Checkpoint时,需要处理过多的数据,从而导致Checkpoint时间过长,甚至会导致Checkpoint失败。以上参数的更多详情,请参见Configuration。
相关文档
自定义函数的注册、更新及删除方法,请参见管理自定义函数(UDF)。
Python自定义函数的开发和使用demo,请参见自定义聚合函数(UDAF)、自定义标量函数(UDSF)和自定义表值函数(UDTF)。
如何在Flink Python作业中使用自定义的Python虚拟环境、第三方Python包、JAR包和数据文件等,请参见使用Python依赖。
JAVA自定义函数的开发和使用demo,请参见自定义聚合函数(UDAF)、自定义标量函数(UDSF)和自定义表值函数(UDTF)。
JAVA自定义函数的调试和调优方法,请参见概述。