本文为您介绍Python自定义标量函数(UDSF)的开发、注册和使用流程。
定义
自定义标量函数(UDSF)将0个、1个或多个标量值映射到一个新的标量值。输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
使用限制
由于实时计算Flink版受部署环境和网络环境等因素的影响,开发Python自定义函数时,需要注意以下限制:
仅支持开源Flink V1.12及以上版本。
Flink工作空间已预装了Python 3.7.9,因此需要您在Python 3.7.9版本开发代码。
Flink运行环境仅支持JDK 8和JDK 11,如果Python作业中依赖第三方JAR包,请确保JAR包兼容。
仅支持开源Scala V2.11版本,如果Python作业中依赖第三方JAR包,请确保使用Scala V2.11对应的JAR包依赖。
UDSF开发
Flink为您提供了Python自定义函数示例,便于您快速开发自定义函数。Flink Python自定义函数示例中包含了Python UDSF、Python UDAF和Python UDTF的实现。本文以Windows操作系统为例,为您介绍如何进行UDSF开发。
下载并解压python_demo-master示例到本地。
说明python_demo-master属于第三方搭建的网站,访问时可能会存在无法打开或访问延迟的问题。
在PyCharm中,单击python_demo-master。
,打开刚才解压缩完成的双击打开\python_demo-master\udx\udfs.py后,根据您的业务,修改udfs.py。
该示例中,sub_string定义了获取每条数据中从begin~end位的字符的代码。
from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int): return s[begin:end]
在下载文件中udx所在的目录(即\python_demo-master目录)下执行如下命令打包文件。
zip -r python_demo.zip udx
\python_demo-master\目录下会出现python_demo.zip的ZIP包,即代表完成了Python UDSF的开发工作。
UDSF注册
UDSF注册过程,请参见管理自定义函数(UDF)。
UDSF使用
在完成注册UDSF后,您就可以使用UDSF,详细的操作步骤如下。
Flink SQL作业开发。详情请参见SQL作业开发。
获取ASI_UDSF_Source表中a字段中每行字符串中第2~4位的字符,代码示例如下。
CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a,2,4) FROM ASI_UDSF_Source;
在
页面,单击目标作业名称操作列的启动。启动成功后,ASI_UDSF_Sink表每行会被插入ASI_UDSF_Source表中a字段每行字符串的第2~4位字符。