Flink全托管支持在SQL作业中使用Python自定义函数,本文为您介绍Flink自定义函数UDX的分类、定义和依赖,以及如何进行调试和调优。

UDX分类

Flink支持以下3类自定义函数。
UDX分类 描述
UDF(User Defined Scalar Function) 用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见Python自定义标量函数(UDF)
UDAF(User Defined Aggregation Function) 自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见Python自定义聚合函数(UDAF)
UDTF(User Defined Table-valued Function) 自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见Python自定义表值函数(UDTF)

依赖

Flink全托管集群已预装了Pandas、NumPy和PyArrow等常用的Python包,您可以在作业开发页面,了解Flink全托管集群中已安装的第三方Python包列表。

此外,您也可以在Python自定义函数中使用其它类型的第三方Python包。需要注意的是,如果使用了非预装的第三方Python包,在注册Python UDF时,需要将其作为依赖文件上传,详情请参见管理自定义函数(UDF)使用Python依赖

调试

您可以在Python自定义函数的代码实现中,通过Logging的方式,输出日志信息,方便问题定位,示例如下。
@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j
日志输出后,您可以在TaskManager的日志文件中查看日志。

调优

  • 使用Pandas库

    除了普通Python自定义函数之外,Flink全托管也支持您使用Pandas自定义函数。对于Pandas自定义函数,输入数据的类型是Pandas中定义的数据结构,例如pandas.Series和pandas.DataFrame等,您可以在Pandas自定义函数中使用Pandas和Numpy等高性能的Python库,开发出高性能的Python自定义函数,详情请参见Vectorized User-defined Functions

  • 配置以下参数
    Python自定义函数的性能在很大程度取决于Python自定义函数自身的实现,如果遇到性能问题,您需要尽可能优化Python自定义函数的实现。除此之外,Python自定义函数的性能也受以下参数取值的影响。
    参数 说明
    python.fn-execution.bundle.size Python UDF的计算是异步的,在执行过程中,Java算子将数据异步发送给Python进程进行处理。Java算子在将数据发送给Python进程之前,会先将数据缓存起来,到达一定阈值之后,再发送给Python进程。python.fn-execution.bundle.size参数可用来控制可缓存的数据最大条数。

    默认值为100000,单位是条数。

    python.fn-execution.bundle.time 用来控制数据的最大缓存时间。当缓存的数据条数到达python.fn-execution.bundle.size定义的阈值或缓存时间到达python.fn-execution.bundle.time定义的阈值时,会触发缓存数据的计算。

    默认值为1000,单位是毫秒。

    python.fn-execution.arrow.batch.size 使用Pandas UDF时,一个arrow batch可容纳的数据最大条数,默认值为10000。
    说明 python.fn-execution.arrow.batch.size参数值不能大于python.fn-execution.bundle.size参数值。
    说明 以上3个参数并不是配置的越大越好,当这些参数取值配置过大时,可能会导致Checkpoint时,需要处理过多的数据,从而导致Checkpoint时间过长,甚至会导致Checkpoint失败。以上参数的更多详情,请参见Configuration