自定义标量函数(UDSF)

更新时间:
复制为 MD 格式

本文为您介绍Python自定义标量函数(UDSF)的开发、注册和使用流程。

定义

自定义标量函数(UDSF)将0个、1个或多个标量值映射到一个新的标量值。输入与输出是一对一的关系,即读入一行数据,写出一条输出值。

使用限制

由于实时计算Flink版受部署环境和网络环境等因素的影响,开发Python自定义函数时,需要注意以下限制:

  • 仅支持开源Flink V1.12及以上版本。

  • Flink工作空间已预装了Python,因此需要您在对应Python版本上开发代码。

    说明

    实时计算引擎VVR 8.0.11以下版本预装Python 3.7.9版本,实时计算引擎VVR 8.0.11及以上版本预装Python 3.9.21版本。如需将低版本升级至实时计算引擎VVR 8.0.11及以上版本,必须对先前版本的PyFlink作业进行重新测试、部署和运行。

  • Flink运行环境仅支持JDK 8JDK 11,如果Python作业中依赖第三方JAR包,请确保JAR包兼容。

  • 仅支持开源Scala V2.11版本,如果Python作业中依赖第三方JAR包,请确保使用Scala V2.11对应的JAR包依赖。

UDSF开发

说明

Flink为您提供了Python自定义函数示例,便于您快速开发自定义函数。Flink Python自定义函数示例中包含了Python UDSF、Python UDAFPython UDTF的实现。本文以Windows操作系统为例,为您介绍如何进行UDSF开发。

  1. 下载并解压python_demo-master示例到本地。

    说明

    python_demo-master属于第三方搭建的网站,访问时可能会存在无法打开或访问延迟的问题。

  2. PyCharm中,单击file > open,打开刚才解压缩完成的python_demo-master

  3. 双击打开\python_demo-master\udx\udfs.py后,根据您的业务,修改udfs.py

    该示例中,sub_string定义了获取每条数据中从begin~end位的字符的代码。

    from pyflink.table import DataTypes
    from pyflink.table.udf import udf
    
    
    @udf(result_type=DataTypes.STRING())
    def sub_string(s: str, begin: int, end: int):
        return s[begin:end]
  4. 在下载文件中udx所在的目录(即\python_demo-master目录)下执行如下命令打包文件。

    zip -r python_demo.zip udx

    \python_demo-master\目录下会出现python_demo.zipZIP包,即代表完成了Python UDSF的开发工作。

UDSF注册

UDSF注册过程,请参见管理自定义函数(UDF)

UDSF使用

在完成注册UDSF后,您就可以使用UDSF,详细的操作步骤如下。

  1. Flink SQL作业开发。详情请参见作业开发地图

    获取ASI_UDSF_Source表中a字段中每行字符串中第2~4位的字符,代码示例如下。

    CREATE TEMPORARY TABLE ASI_UDSF_Source (
      a VARCHAR,
      b INT,
      c INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDSF_Sink (
      a VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDSF_Sink
    SELECT ASI_UDSF(a,2,4)
    FROM ASI_UDSF_Source;
  2. 运维中心 > 作业运维页面,单击目标作业名称操作列的启动

    启动成功后,ASI_UDSF_Sink表每行会被插入ASI_UDSF_Source表中a字段每行字符串的第2~4位字符。

异步自定义函数

如果自定义函数中需要进行外部数据库访问、HTTP 服务等 I/O 密集型操作,建议使用异步自定义函数。单个异步自定义函数可以并发处理多个 I/O 请求,使得等待时间分摊到多个请求上,提升作业吞吐。

使用限制

  • 仅 VVR 11.7 及以上版本支持,需安装VVR pyflink >=11.7。详情请参见ververica-flink

    pip3 install "ververica-flink>=11.7"

  • 仅支持异步自定义标量函数(UDSF)。

  • 仅支持 Python 进程模式,即 python.execution-mode=process

  • 暂不支持 Pandas 异步自定义函数。

使用方式

异步自定义函数可以通过 Python 异步函数或继承异步函数类实现,示例代码如下。

import asyncio

from pyflink.table import DataTypes
from pyflink.table.udf import AsyncScalarFunction, udf


# 方法一:使用 Python 异步函数
@udf(result_type=DataTypes.STRING())
async def async_api_call(product_id: str) -> str:
    await asyncio.sleep(0.05)
    return f"product_{product_id}"


# 方法二:继承异步函数类
class AsyncUserLookup(AsyncScalarFunction):
    def open(self, function_context):
        self.cache = {}

    async def eval(self, user_id: str) -> str:
        if user_id in self.cache:
            return self.cache[user_id]

        await asyncio.sleep(0.05)
        result = f"user_{user_id}"
        self.cache[user_id] = result
        return result

    def close(self):
        self.cache.clear()


async_user_lookup = udf(
    AsyncUserLookup(),
    input_types=[DataTypes.STRING()],
    result_type=DataTypes.STRING()
)

异步自定义函数的注册与使用方式与同步函数相同。

配置参数

您可以通过修改以下参数改变异步自定义函数的运行行为。

参数

默认值

说明

table.exec.async-scalar.max-concurrent-operations

10

单个算子实例允许同时触发的最大异步调用数。默认值为 10。

table.exec.async-scalar.timeout

3 min

单次异步调用的超时时间。

table.exec.async-scalar.retry-strategy

FIXED_DELAY

异步调用失败后的重试策略,支持:

  • FIXED_DELAY:等待固定时间后重试。

  • NO_RETRY:不进行重试。

table.exec.async-scalar.retry-delay

100 ms

固定延迟重试的等待时间。

说明

仅当 table.exec.async-scalar.retry-strategy 为 FIXED_DELAY 时生效。

table.exec.async-scalar.max-attempts

3

异步调用失败前的最大尝试次数。

说明

仅当 table.exec.async-scalar.retry-strategy 为 FIXED_DELAY 时生效。