Python作业开发

本文为您介绍Flink Python API作业开发的背景信息、使用限制、开发方法、调试方法和连接器使用等。

背景信息

Flink Python作业需要您在本地完成开发工作,Python作业开发完成后,再在Flink开发控制台上部署并启动才能看到业务效果。整体的操作流程详情请参见Flink Python作业快速入门

Flink工作空间已安装下列软件包。

软件包

版本

apache-beam

2.23.0

avro-python3

1.9.1

certifi

2020.12.5

cloudpickle

1.2.2

crcmod

1.7

cython

0.29.16

dill

0.3.1.1

docopt

0.6.2

fastavro

0.23.6

future

0.18.2

grpcio

1.29.0

hdfs

2.6.0

httplib2

0.17.4

idna

2.10

jsonpickle

1.2

mock

2.0.0

numpy

1.19.5

oauth2client

3.0.0

pandas

0.25.3

pbr

5.5.1

pip

20.1.1

protobuf

3.15.3

py4j

0.10.8.1

pyarrow

0.17.1

pyasn1-modules

0.2.8

pyasn1

0.4.8

pydot

1.4.2

pymongo

3.11.3

pyparsing

2.4.7

python-dateutil

2.8.0

pytz

2021.1

requests

2.25.1

rsa

4.7.2

setuptools

47.1.0

six

1.15.0

typing-extensions

3.7.4.3

urllib3

1.26.3

wheel

0.36.2

使用限制

由于Flink受部署环境、网络环境等因素的影响,所以开发Python作业,需要注意以下限制:

  • 仅支持开源Flink V1.13及以上版本。

  • Flink工作空间已预装了Python 3.7.9,且Python环境中已预装了Pandas、NumPy、PyArrow等常用的Python库。因此需要您在Python 3.7及以上版本开发代码。

  • Flink运行环境使用的是JDK1.8,如果Python作业中依赖第三方JAR包,请确保JAR包兼容JDK1.8。

  • VVR 4.x仅支持开源Scala V2.11版本,VVR 6.x及以上版本仅支持开源Scala V2.12版本。如果Python作业中依赖第三方JAR包,请确保使用Scala版本对应的JAR包依赖。

作业开发

开发参考

您可以参见以下文档在本地完成Flink业务代码开发,开发完成后您需要将其上传到Flink开发控制台,并部署上线作业。

作业调试

您可以在Python自定义函数的代码实现中,通过logging的方式,输出日志信息,方便后期问题定位,示例如下。

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j

日志输出后,您可以在TaskManager的日志文件查看。

连接器使用

Flink所支持的连接器列表,请参见支持的连接器。连接器使用方法如下:

  1. 登录实时计算控制台

  2. 单击目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击文件管理

  4. 单击上传资源,选择您要上传的目标连接器的Python包。

    您可以上传您自己开发的连接器,也可以上传Flink提供的连接器。Flink提供的连接器官方Python包的下载地址,请参见Connector列表

  5. 运维中心 > 作业运维页面,单击部署作业 > Python 作业附加依赖文件项选择目标连接器的Python包,配置其他参数并部署作业。

  6. 单击部署的作业名称,在部署详情页签运行参数配置区域,单击编辑,在其他配置中,添加Python连接器包位置信息。

    如果您的作业需要依赖多个连接器Python包,例如依赖的2个包的名字分别为connector-1.jar和connector-2.jar,则配置信息如下。

    pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'

相关文档