文档

概述

更新时间:

实时计算Flink版支持在Flink SQL作业中使用Python自定义函数,本文为您介绍Flink Python自定义函数的分类、Python依赖使用方法和调优方式。

自定义函数分类

分类

描述

UDSF(User Defined Scalar Function)

用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见自定义标量函数(UDSF)

UDAF(User Defined Aggregation Function)

自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见自定义聚合函数(UDAF)

UDTF(User Defined Table-valued Function)

自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见自定义表值函数(UDTF)

使用Python依赖

实时计算Flink版集群已预装了Pandas、NumPy和PyArrow等常用的Python包,您可以在Python作业开发页面,了解实时计算Flink版中已安装的第三方Python包列表。预装的Python包使用时需要在Python函数内部导入。示例如下。

@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    import numpy as np
    return np.percentile(values, percentile)

此外,您也可以在Python自定义函数中使用其他类型的第三方Python包。需要注意的是,如果使用了非预装的第三方Python包,在注册Python UDF时,需要将其作为依赖文件上传,详情请参见管理自定义函数(UDF)使用Python依赖

代码调试

您可以在Python自定义函数的代码实现中,通过Logging的方式,输出日志信息,方便问题定位,示例如下。

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j

日志输出后,您可以在TaskManager的日志文件中查看日志,详情请参见查看运行日志

性能调优

预先加载资源

预先加载资源可以在UDF初始化时提前加载资源,无需在每一次执行计算(即eval)时重新加载资源。例如,您可能只想加载一次大型深度学习模型,然后对模型多次运行批量预测。代码示例如下。

from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf

class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
说明

关于如何上传Python数据文件,可以参考文档使用Python依赖

使用Pandas库

除了普通Python自定义函数之外,实时计算Flink版也支持您使用Pandas自定义函数。对于Pandas自定义函数,输入数据的类型是Pandas中定义的数据结构,例如pandas.Series和pandas.DataFrame等,您可以在Pandas自定义函数中使用Pandas和NumPy等高性能的Python库,开发出高性能的Python自定义函数,详情请参见Vectorized User-defined Functions

配置参数

Python自定义函数的性能在很大程度取决于Python自定义函数自身的实现,如果遇到性能问题,您需要尽可能优化Python自定义函数的实现。除此之外,Python自定义函数的性能也受以下参数取值的影响。

参数

说明

python.fn-execution.bundle.size

Python UDF的计算是异步的,在执行过程中,Java算子将数据异步发送给Python进程进行处理。Java算子在将数据发送给Python进程之前,会先将数据缓存起来,到达一定阈值之后,再发送给Python进程。python.fn-execution.bundle.size参数可用来控制可缓存的数据最大条数。

默认值为100000,单位是条数。

python.fn-execution.bundle.time

用来控制数据的最大缓存时间。当缓存的数据条数到达python.fn-execution.bundle.size定义的阈值或缓存时间到达python.fn-execution.bundle.time定义的阈值时,会触发缓存数据的计算。

默认值为1000,单位是毫秒。

python.fn-execution.arrow.batch.size

使用Pandas UDF时,一个arrow batch可容纳的数据最大条数,默认值为10000。

说明

python.fn-execution.arrow.batch.size参数值不能大于python.fn-execution.bundle.size参数值。

说明

以上3个参数并不是配置的越大越好,当这些参数取值配置过大时,可能会导致Checkpoint时,需要处理过多的数据,从而导致Checkpoint时间过长,甚至会导致Checkpoint失败。以上参数的更多详情,请参见Configuration

相关文档