本文为您介绍Flink全托管Python API作业开发的背景信息、使用限制、开发方法和Connector使用等。

背景信息

Flink全托管集群已安装下列软件包。
软件包 版本
apache-beam 2.23.0
avro-python3 1.9.1
certifi 2020.12.5
chardet 4.0.0
cloudpickle 1.2.2
crcmod 1.7
cython 0.29.16
dill 0.3.1.1
docopt 0.6.2
fastavro 0.23.6
future 0.18.2
grpcio 1.29.0
hdfs 2.6.0
httplib2 0.17.4
idna 2.10
jsonpickle 1.2
mock 2.0.0
numpy 1.19.5
oauth2client 3.0.0
pandas 0.25.3
pbr 5.5.1
pip 20.1.1
protobuf 3.15.3
py4j 0.10.8.1
pyarrow 0.17.1
pyasn1-modules 0.2.8
pyasn1 0.4.8
pydot 1.4.2
pymongo 3.11.3
pyparsing 2.4.7
python-dateutil 2.8.0
pytz 2021.1
requests 2.25.1
rsa 4.7.2
setuptools 47.1.0
six 1.15.0
typing-extensions 3.7.4.3
urllib3 1.26.3
wheel 0.36.2

使用限制

由于Flink全托管产品受部署环境、网络环境等因素的影响,所以开发Python作业,需要注意以下限制:
  • 仅支持开源Flink V1.12。
  • Flink全托管集群已预装了Python 3.7.9,且Python环境中已预装了Pandas、NumPy、PyArrow等常用的Python库。因此需要您在Python 3.7版本开发代码。
  • Flink全托管运行环境使用的是JDK1.8,如果Python作业中依赖第三方JAR包,请确保JAR包兼容JDK1.8。
  • 仅支持开源Scala V2.11版本,如果Python作业中依赖第三方JAR包,请确保使用Scala V2.11对应的JAR包依赖。

作业开发

您需要在线下完成Python API作业开发后,再在Flink全托管开发控制台上提交作业到集群上运行。您可以参见以下文档开发Flink全托管产品业务代码:

作业调试

您可以在Python自定义函数的代码实现中,通过logging的方式,输出日志信息,方便后期问题定位,示例如下。
@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j
日志输出后,您可以在TaskManager的日志文件查看。

Connector使用

Flink全托管所支持的Connector列表,请参见支持的上下游存储。Connector使用方法如下:
  1. 登录实时计算控制台
  2. Flink全托管页签,单击目标工作空间操作列下的开发控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的目标Connector的JAR包。

    您可以上传您自己开发的Connector,也可以上传Flink全托管产品提供的Connector。Flink全托管产品提供的Connector官方JAR包的下载地址,请参见Connector列表

  5. 在目标作业开发页面附加依赖文件项,选择目标Connector的JAR包。
  6. 在目标作业开发页面右侧高级配置页面更多Flink配置项,添加相关配置。
    假如您的作业需要依赖多个Connector JAR包,且名字分别为connector-1.jar和connector-2.jar,则配置信息如下。
    pipeline.classpaths: 'file:///flink/usrlib/connector-1.jar;file:///flink/usrlib/connector-2.jar'