本文为您介绍如何提交Flink全托管Python作业至集群。

上传资源

Python API作业运行前,需要您按照以下步骤将Python作业文件或Python依赖上传到Flink全托管开发控制台。

  1. 登录实时计算控制台
  2. Flink全托管页签,单击目标工作空间操作列下的开发控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的Python作业文件或Python依赖。

创建作业

  1. 登录Flink全托管开发控制台,新建作业。
    1. 登录实时计算控制台
    2. Flink全托管页签,单击目标工作空间操作列下的开发控制台
    3. 在左侧导航栏,单击作业开发
    4. 单击新建
    5. 新建文件对话框,填写作业配置信息。
      文件类型请选择为PYTHON。
      作业参数 说明
      文件名称 作业的名称。
      说明 作业名称在当前项目中必须保持唯一。
      作业类型 支持以下类型:
      • SQL
      • JAR
      • PYTHON
      部署目标 选择作业需要部署的集群。
      说明 Python作业仅支持Per-Job集群
    6. 单击确认
  2. 在作业开发页面,填写基本配置信息。
    您可以直接填写以下配置信息,也可以单击YAML直接修改配置信息。配置参数解释如下表所示。
    参数 说明
    Python文件地址 Python作业文件,可以为.py文件或者.zip文件。
    Entry Module 程序的入口类。如果Python作业文件为.py文件,则该项不需要填写;如果Python作业文件为.zip文件,则需要在此处输入您的Entry Module,例如example.word_count
    Entrypoint main args 作业参数。
    Python Libraries 第三方Python包。第三方Python包会被添加到Python worker进程的PYTHONPATH中,从而在Python自定义函数中可以直接访问。如何使用第三方Python包,详情请参见使用第三方Python包
    Python Archives 存档文件,目前仅支持ZIP格式的文件,例如 .zip.jar.whl.egg等。
    存档文件会被解压到Python worker进程的工作目录下。假如,存档文件所在的压缩包名称为mydata.zip,则在Python自定义函数中可以编写以下代码来访问mydata.zip存档文件。
    def map():  
        with open("mydata.zip/mydata/data.txt") as f: 
        ...

    请参见使用自定义的Python虚拟环境使用数据文件,了解更多关于Python Archives的信息。

    附加依赖文件 上传JAR包、数据文件等,上传的文件默认会被上传到作业运行节点的/flink/usrlib/目录下。
    并行度 作业并发数。
  3. 在作业开发页面右侧,单击高级配置,根据业务需要填写配置信息。
    参数解释如下表所示。
    类别 配置项 说明
    常规配置 Flink版本 支持Flink 1.10Flink 1.11Flink 1.12版本。
    Flink镜像标签 选择Flink镜像标签。
    行为配置 恢复策略 当作业状态变为运行时,State的恢复策略。取值如下:
    • Latest Savepoint :将从最新Savepoint文件恢复。
    • Latest State:将从最新的Savepoint或Checkpoint中恢复。
    • None:不带State恢复作业,即作业恢复后没有State了,需要重新开始计算了。
    说明 升级策略恢复策略需要配合使用。如果升级策略选择Stateful恢复策略选择None,则完成Savepoint后,作业会从作业的起始时间重新启动,而不是从Savepoint恢复。
    创建实例的最大重试次数 创建实例失败后的重试次数。
    Stop with Drain 如果开启Stop With Drain功能,当作业被手动停止或者保留State升级作业时,窗口中已有数据结果会输出,即使没有满足关窗条件。
    Flink配置 Checkpoint间隔 定时执行Checkpoint的时间间隔。如果不填写,将会关闭 Checkpoint。
    两次Checkpoint之间的最短时间间隔 两次Checkpoint之间的最短时间间隔,如果Checkpoint最大并行度是1,则该配置确保两个Checkpoint之间有一个最短时间间隔。
    Checkpoint Retention策略 当作业无法再重启或者作业被暂停时,是否需要保留最新完成的Checkpoint。该参数取值如下:
    • Always:作业停止时保留Checkpoint。
    • Only when FAILED:作业停止后删除Checkpoint。
    • Never(默认值):不保留Checkpoint。
    开启Unaligned Checkpoint 开启Unaligned Checkpoint会大大降低反压情况下Checkpoint的总执行时间。但是也会导致增大单次Checkpoint的大小。
    Flink重启策略配置 当有Task失败时,如果没有开启Checkpoint,JobManager进程不会重启。如果开启了Checkpoint,则JobManager进程会重启。该参数取值如下:
    • Failure Rate:基于失败率重启。
    • Fixed Delay:固定间隔重启。
    • No Restarts(默认值):不会重启。
    Flink Master 容错配置(高可用性) 该参数取值如下:
    • None(默认值):表示系统不会保存元数据给JobManager重启恢复时使用。
    • Kubernetes:表示系统会将元数据保存到Kubernetes 集群中供JobManager重启恢复使用。
    更多Flink配置 在此设置其他Flink配置。例如taskmanager.numberOfTaskSlots: 1
    资源配置 Task Managers数量 默认与并行度一致。
    Job Manager CPUs 默认值为1。
    Job Manager Memory 最小值为500 Mi。单位建议使用Gi或Mi,例如,1024 Mi或1.5 Gi。
    Task Manager CPUs 默认值为1。
    Task Manager Memory 最小值为1 Gi。单位建议使用Gi或Mi,例如,1024 Mi或1.5 Gi。
    日志配置 Root Log Level TRACE、DEBUG、INFO、WARN和ERROR。
    Log Levels 填写日志名称和日志级别。
    Logging Profile 日志模板,可以选择系统模板,也可以选择用户配置。
  4. 您可以在作业开发页面右上角,单击上线