Flink Python参考

本文通过以下方面,为您介绍Flink Python的使用方法。

背景信息

DataFlow集群的Flink Python API完全兼容开源的Flink版本,关于Flink Python API的详细信息,请参见Python API

使用Python依赖

通过以下场景为您介绍如何使用Python依赖:

使用自定义的Python虚拟环境

  • 方式一:在DataFlow集群中的某个节点创建Python虚拟环境

    1. 在DataFlow集群的某个节点,准备setup-pyflink-virtual-env.sh脚本,其内容如下。

      set -e
      # 创建Python的虚拟环境。
      python3.6 -m venv venv
      
      # 激活Python虚拟环境。
      source venv/bin/activate
      
      # 准备Python虚拟环境。
      pip install --upgrade pip
      
      # 安装PyFlink依赖。
      pip install "apache-flink==1.13.0"
      
      # 退出Python虚拟环境。
      deactivate
    2. 执行以下命令,运行该脚本。

      ./setup-pyflink-virtual-env.sh

      该命令执行完成后,会生成一个名为venv的目录,即为Python 3.6的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境。

      为了使用该Python虚拟环境,您可以选择将该Python虚拟环境分发到DataFlow集群的所有节点上,也可以在提交PyFlink作业的时候,指定使用该Python虚拟环境,详细信息请参见Command-Line Interface

  • 方式二:在本地开发机创建Python虚拟环境

    1. 在本地准备setup-pyflink-virtual-env.sh脚本,其内容如下。

      set -e
      # 下载Python 3.7 miniconda.sh脚本。
      wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh"
      
      # 为Python 3.7 miniconda.sh脚本添加执行权限。
      chmod +x miniconda.sh
      
      # 创建Python的虚拟环境。
      ./miniconda.sh -b -p venv
      
      # 激活Conda Python虚拟环境。
      source venv/bin/activate ""
      
      # 安装PyFlink依赖。
      pip install "apache-flink==1.13.0"
      
      # 退出Conda Python虚拟环境。
      conda deactivate
      
      # 删除缓存的包。
      rm -rf venv/pkgs
      
      # 将准备好的Conda Python虚拟环境打包。
      zip -r venv.zip venv
    2. 在本地准备build.sh脚本,其内容如下。

      #!/bin/bash
      set -e -x
      yum install -y zip wget
      
      cd /root/
      bash /build/setup-pyflink-virtual-env.sh
      mv venv.zip /build/
    3. 在CMD命令行中,执行如下命令。

      docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh

      该命令执行完成后,会生成一个名为venv.zip的文件,即为Python 3.7的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境,或者在虚拟环境中安装所需的第三方Python包。

使用第三方Python包

如果您的第三方Python包是Zip Safe的,可以直接在Python作业中使用,详细信息请参见Python libraries

如果您的第三方Python包是源码包,且源码包的根目录下存在setup.py文件,则这种类型的第三方Python包通常需要先编译才能被使用。您可以选择以下方式编译第三方Python包:

  • 在DataFlow集群的某个节点编译第三方Python包。

  • 使用quay.io/pypa/manylinux2014_x86_64镜像容器中的Python环境来编译第三方Python包,使用该容器编译生成的包兼容绝大多数Linux环境,关于该镜像容器的详细信息,请参见manylinux

下面以第三方Python包opencv-python-headless为例,介绍如何编译和使用该第三方Python包。

  1. 编译第三方Python包。

    1. 在本地准备requirements.txt文件,其内容如下。

      opencv-python-headless
    2. 在本地准备build.sh脚本,其内容如下。

      #!/bin/bash
      set -e -x
      yum install -y zip
      
      PYBIN=/opt/python/cp37-cp37m/bin
      
      "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt --no-deps
      cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd ..
      rm -rf __pypackages__
    3. 在CMD命令行中,执行如下命令。

      docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh

      该命令执行完成后,会生成一个名为deps.zip的文件,该文件为编译之后的第三方Python包。您也可以修改requirements.txt,安装其他所需的第三方Python包。此外,requirements.txt文件中可以指定多个Python依赖。

  2. 使用第三方Python包。

    关于如何在PyFlink作业中,使用第三方Python包,详情请参见Python Libraries

使用JAR包

如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,则需要指定Connector或者Java自定义函数所在的JAR包,详情请参见JAR Dependencies

使用数据文件

如果您的Flink Python作业中需要访问数据文件,例如模型文件等,则可以通过Python Archives的方式来访问。