本文为您介绍如何提交Flink全托管Python作业至集群。

上传资源

Python API作业运行前,需要您按照以下步骤将Python作业文件或Python依赖上传到Flink全托管开发控制台。

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的Python作业文件或Python依赖。

作业提交

  1. 登录Flink全托管开发控制台,新建作业。
    1. 登录实时计算管理控制台
    2. Flink全托管页签,单击目标工作空间操作列下的控制台
    3. 在左侧导航栏,单击作业开发
    4. 单击新建
    5. 新建文件对话框,填写作业配置信息。
      作业参数 说明
      文件名称 作业的名称。
      说明 作业名称在当前项目中必须保持唯一。
      文件类型 流作业和批作业均支持以下文件类型:
      • SQL
      • JAR
      • PYTHON
      说明 VVP 2.4.1且VVR 3.0.1及以上版本支持批作业。
      部署目标 选择作业需要部署的集群。
      说明 Python作业仅支持Per-Job集群
      存储位置 指定该作业的代码文件所属的文件夹。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    6. 单击确认
  2. 在作业开发页面,填写基本配置信息。
    您可以直接填写以下配置信息,也可以单击YAML直接修改配置信息。配置参数解释如下表所示。
    参数 说明
    部署目标 您可以修改创建作业时已选择的部署目标。
    Python文件地址 Python作业文件,可以为.py文件或者.zip文件。
    Entry Module 程序的入口类。如果Python作业文件为.py文件,则该项不需要填写;如果Python作业文件为.zip文件,则需要在此处输入您的Entry Module,例如example.word_count
    Entrypoint main args 作业参数。
    Python Libraries 第三方Python包。第三方Python包会被添加到Python worker进程的PYTHONPATH中,从而在Python自定义函数中可以直接访问。如何使用第三方Python包,详情请参见使用第三方Python包
    Python Archives 存档文件,目前仅支持ZIP格式的文件,例如 .zip.jar.whl.egg等。
    存档文件会被解压到Python worker进程的工作目录下。如果存档文件所在的压缩包名称为mydata.zip,则在Python自定义函数中可以编写以下代码来访问mydata.zip存档文件。
    def map():  
        with open("mydata.zip/mydata/data.txt") as f: 
        ...

    请参见使用自定义的Python虚拟环境使用数据文件,了解更多关于Python Archives的信息。

    附加依赖文件 上传JAR包、数据文件等,上传的依赖文件默认会被上传到作业运行节点的/flink/usrlib/目录下。
    说明 Session集群不支持设置附加依赖文件,仅Per-Job集群支持设置附加依赖文件
    并行度 作业并发数。
  3. 在作业开发页面右侧,单击高级配置,根据业务需要填写配置信息。
    参数解释如下表所示。
    类别 配置项 说明
    常规配置 引擎版本 仅支持vvr-4.0.7-flink-1.13vvr-3.0.3-flink-1.12版本。
    说明 从VVR 3.0.3版本(对应Flink 1.12版本)开始,VVP支持同时运行多个不同引擎版本的SQL作业。如果您的作业已使用了Flink 1.12及更早版本的引擎,您需要按照以下情况进行处理:
    • Flink 1.12版本:停止后启动作业,系统将自动将引擎升级为vvr-3.0.3-flink-1.12版本。
    • Flink 1.11或Flink 1.10版本:手动将作业引擎版本升级到vvr-3.0.3-flink-1.12vvr-4.0.7-flink-1.13版本后重启作业,否则会在启动作业时超时报错。
    编辑标签 您可以为作业设置标签。后续可以通过搜索的方式,快速找到目标作业。
    行为配置 创建实例的最大重试次数 创建实例失败后的重试次数。
    Stop with Drain 如果开启Stop With Drain功能,当作业被手动停止时,所有基于Event Time的窗口都会被触发。
    Flink配置 Checkpoint间隔 定时执行Checkpoint的时间间隔。如果不填写,将会关闭Checkpoint。
    两次Checkpoint之间的最短时间间隔 两次Checkpoint之间的最短时间间隔,如果Checkpoint最大并行度是1,则该配置确保两个Checkpoint之间有一个最短时间间隔。
    开启Unaligned Checkpoint 开启Unaligned Checkpoint会大大减少反压情况下Checkpoint的总执行时间。但是也会导致增大单次Checkpoint的大小。
    Flink重启策略配置 当有Task失败时,如果没有开启Checkpoint,JobManager进程不会重启。如果开启了Checkpoint,则JobManager进程会重启。该参数取值如下:
    • Failure Rate:基于失败率重启。
    • Fixed Delay:固定间隔重启。
    • No Restarts(默认值):不会重启。
    更多Flink配置 在此设置其他Flink配置。例如taskmanager.numberOfTaskSlots: 1
    资源配置 Job Manager CPUs 默认值为1。
    Job Manager Memory 最小值为1 GiB。单位建议使用GiB或MiB,例如,1024 MiB或1.5 GiB。
    Task Manager CPUs 默认值为1。
    Task Manager Memory 最小值为1 GiB。单位建议使用GiB或MiB,例如,1024 MiB或1.5 GiB。
    日志配置 Root Log Level 日志级别从低到高顺序如下:
    1. TRACE:比DEBUG更细粒度的信息。
    2. DEBUG:系统运行状态的信息。
    3. INFO:重要或者您感兴趣的信息。
    4. WARN:系统可能出现潜在错误的信息。
    5. ERROR:系统出现错误和异常的信息。
    Log Levels 填写日志名称和日志级别。
    Logging Profile 日志模板,可以选择系统模板,也可以选择用户配置。
  4. 在作业开发页面右上角,单击上线