Python作业开发

本文为您介绍Flink Python API作业开发的背景信息、使用限制、开发方法、调试方法和连接器使用等。

背景信息

Flink Python作业需要您在本地完成开发工作,Python作业开发完成后,再在Flink开发控制台上部署并启动才能看到业务效果。整体的操作流程详情请参见Flink Python作业快速入门

VVR-11

Flink工作空间已安装下列软件包。

软件包

版本

apache-beam

2.48.0

avro-python3

1.10.2

brotlipy

0.7.0

certifi

2022.12.7

cffi

1.15.1

charset-normalizer

2.0.4

cloudpickle

2.2.1

conda

22.11.1

conda-content-trust

0.1.3

conda-package-handling

1.9.0

crcmod

1.7

cryptography

38.0.1

Cython

3.0.12

dill

0.3.1.1

dnspython

2.7.0

docopt

0.6.2

exceptiongroup

1.3.0

fastavro

1.12.1

fasteners

0.20

find_libpython

0.5.0

grpcio

1.56.2

grpcio-tools

1.56.2

hdfs

2.7.3

httplib2

0.22.0

idna

3.4

importlib_metadata

8.7.0

iniconfig

2.1.0

isort

6.1.0

numpy

1.24.4

objsize

0.6.1

orjson

3.9.15

packaging

25.0

pandas

2.3.3

pemja

0.5.5

pip

22.3.1

pluggy

1.0.0

proto-plus

1.26.1

protobuf

4.25.8

py-spy

0.4.0

py4j

0.10.9.7

pyarrow

11.0.0

pyarrow-hotfix

0.6

pycodestyle

2.14.0

pycosat

0.6.4

pycparser

2.21

pydot

1.4.2

pymongo

4.15.4

pyOpenSSL

22.0.0

pyparsing

3.2.5

PySocks

1.7.1

pytest

7.4.4

python-dateutil

2.9.0

pytz

2025.2

regex

2025.11.3

requests

2.32.5

ruamel.yaml

0.18.16

ruamel.yaml.clib

0.2.14

setuptools

70.0.0

six

1.16.0

tomli

2.3.0

toolz

0.12.0

tqdm

4.64.1

typing_extensions

4.15.0

tzdata

2025.2

urllib3

1.26.13

wheel

0.38.4

zipp

3.23.0

zstandard

0.25.0

VVR-8

Flink工作空间已安装下列软件包。

软件包

版本

apache-beam

2.43.0

avro-python3

1.9.2.1

certifi

2025.7.9

charset-normalizer

3.4.2

cloudpickle

2.2.0

crcmod

1.7

Cython

0.29.24

dill

0.3.1.1

docopt

0.6.2

fastavro

1.4.7

fasteners

0.19

find_libpython

0.4.1

grpcio

1.46.3

grpcio-tools

1.46.3

hdfs

2.7.3

httplib2

0.20.4

idna

3.10

isort

6.0.1

numpy

1.21.6

objsize

0.5.2

orjson

3.10.18

pandas

1.3.5

pemja

0.3.2

pip

22.3.1

proto-plus

1.26.1

protobuf

3.20.3

py4j

0.10.9.7

pyarrow

8.0.0

pycodestyle

2.14.0

pydot

1.4.2

pymongo

3.13.0

pyparsing

3.2.3

python-dateutil

2.9.0

pytz

2025.2

regex

2024.11.6

requests

2.32.4

setuptools

58.1.0

six

1.17.0

typing_extensions

4.14.1

urllib3

2.5.0

wheel

0.33.4

zstandard

0.23.0

VVR-6

Flink工作空间已安装下列软件包。

软件包

版本

apache-beam

2.27.0

avro-python3

1.9.2.1

certifi

2024.8.30

charset-normalizer

3.3.2

cloudpickle

1.2.2

crcmod

1.7

Cython

0.29.16

dill

0.3.1.1

docopt

0.6.2

fastavro

0.23.6

future

0.18.3

grpcio

1.29.0

hdfs

2.7.3

httplib2

0.17.4

idna

3.8

importlib-metadata

6.7.0

isort

5.11.5

jsonpickle

2.0.0

mock

2.0.0

numpy

1.19.5

oauth2client

4.1.3

pandas

1.1.5

pbr

6.1.0

pemja

0.1.4

pip

20.1.1

protobuf

3.17.3

py4j

0.10.9.3

pyarrow

2.0.0

pyasn1

0.5.1

pyasn1-modules

0.3.0

pycodestyle

2.10.0

pydot

1.4.2

pymongo

3.13.0

pyparsing

3.1.4

python-dateutil

2.8.0

pytz

2024.1

requests

2.31.0

rsa

4.9

setuptools

47.1.0

six

1.16.0

typing-extensions

3.7.4.3

urllib3

2.0.7

wheel

0.42.0

zipp

3.15.0

使用限制

由于Flink受部署环境、网络环境等因素的影响,所以开发Python作业,需要注意以下限制:

  • 仅支持开源Flink V1.13及以上版本。

  • Flink工作空间已预装了Python环境,且Python环境中已预装了Pandas、NumPy、PyArrow等常用的Python库。

    说明

    实时计算引擎VVR 8.0.11以下版本预装Python 3.7.9版本,实时计算引擎VVR 8.0.11及以上版本预装Python 3.9.21版本。如需将低版本升级至实时计算引擎VVR 8.0.11及以上版本,必须对先前版本的PyFlink作业进行重新测试、部署和运行。

  • Flink运行环境仅支持JDK 8JDK 11,如果Python作业中依赖第三方JAR包,请确保JAR包兼容。

  • VVR 4.x仅支持开源Scala V2.11版本,VVR 6.x及以上版本仅支持开源Scala V2.12版本。如果Python作业中依赖第三方JAR包,请确保使用Scala版本对应的JAR包依赖。

作业开发

开发参考

您可以参见以下文档在本地完成Flink业务代码开发,开发完成后您需要将其上传到Flink开发控制台,并部署上线作业。

作业调试

您可以在Python自定义函数的代码实现中,通过logging的方式,输出日志信息,方便后期问题定位,示例如下。

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j

日志输出后,您可以在TaskManager的日志文件查看。

连接器使用

Flink所支持的连接器列表,请参见支持的连接器。连接器使用方法如下:

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击文件管理

  4. 单击上传资源,选择您要上传的目标连接器的Python包。

    您可以上传您自己开发的连接器,也可以上传Flink提供的连接器。Flink提供的连接器官方Python包的下载地址,请参见Connector列表

  5. 运维中心 > 作业运维页面,单击部署作业 > Python 作业附加依赖文件项选择目标连接器的Python包,配置其他参数并部署作业。

  6. 单击部署的作业名称,在部署详情页签运行参数配置区域,单击编辑,在其他配置中,添加Python连接器包位置信息。

    如果您的作业需要依赖多个连接器Python包,例如依赖的2个包的名字分别为connector-1.jarconnector-2.jar,则配置信息如下。

    pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'

相关文档