本文为您介绍如何提交Flink全托管DataStream API、Table API和Python API作业至集群运行。

上传JAR包

DataStream API、Table API和Python API作业运行前,需要您按照以下步骤将JAR包、Python作业文件或Python依赖上传到Flink全托管开发控制台。

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的JAR包。
    如果您的作业是Python API类型,则需要上传PyFlink的官方JAR包。官方JAR的下载地址,请参见PyFlink V1.11PyFlink V1.12
    说明 VVP 2.4.0增加了单独的Python作业提交入口,推荐您通过该入口提交Python作业,详情请参见概述

创建作业

  1. 登录Flink全托管开发控制台,新建作业。
    1. 登录实时计算管理控制台
    2. Flink全托管页签,单击目标工作空间操作列下的开发控制台
    3. 在左侧导航栏,单击作业开发
    4. 单击新建
    5. 新建文件对话框,填写作业配置信息。
      作业参数 说明
      文件名称 作业的名称。
      说明 作业名称在当前项目中必须保持唯一。
      文件类型 流作业和批作业均支持以下文件类型:
      • SQL
      • JAR
      • PYTHON
      说明 VVP 2.4.1且VVR 3.0.1及以上版本支持批功能。
      部署目标 选择作业需要部署的集群,支持以下两种集群模式:
      • Per-Job集群(默认):适用于占用资源比较大或持续稳定运行的作业。因为作业之间资源隔离,每个作业都需要一个独立的JM,小任务JM的资源利用率较低。
      • Session集群:适用于占用资源比较小或任务启停比较频繁的作业。因为多个作业可以复用相同的JM,可以提高JM资源利用率。
      说明 如果您需要开启SQL Preview功能,必须选择Session集群,且已将其设置为SQL Previews集群,详情请参见作业调试配置Session集群
      存储位置 指定该作业的代码文件所属的文件夹。

      您还可以在现有文件夹右侧,单击新建文件夹图标,新建子文件夹。

    6. 单击确认
  2. 在作业开发页面,填写基本配置信息。
    您可以直接填写以下配置信息,也可以单击YAML直接修改配置信息。配置参数解释如下表所示。
    参数 说明
    部署目标 您可以修改创建作业时已选择的部署目标。
    JAR URI 请选择一个文件或者手动上传新文件,您可以拖拽文件到此区域或者单击右侧上传图标选择文件上传。
    说明 如果您的作业是Python API类型,则需要填写PyFlink的官方JAR包,官方JAR的下载地址,请参见PyFlink V1.11PyFlink V1.12
    Entrypoint Class 程序的入口类。如果您的JAR包未指定主类,请在此处输入您的Entrypoint Class类的标准路径。
    说明 如果您的作业是Python API类型,Entrypoint class应该填写为org.apache.flink.client.python.PythonDriver
    Entrypoint main args 您可以在此处传入参数,在主方法里面去调用该参数。
    说明 如果您的作业是Python API类型,需要首先上传您的Python作业文件。Python作业文件上传之后,默认会被上传到作业运行节点的/flink/usrlib/目录下。

    假如您的Python作业文件名为word_count.py,则Entrypoint main args需要填写为-py /flink/usrlib/word_count.py

    Python作业文件的路径需要填写为完整路径,/flink/usrlib/不能省略,且不能更改。

    附加依赖文件
    • (推荐)选择您已上传的目标附加依赖文件。

      您需要提前通过Flink全托管开发控制台左侧资源上传或作业开发页面附件依赖文件右侧的更新JAR上传附件依赖文件。上传的附件依赖文件会固定被保存在oss://ossBucketName/artifacts/namespaces/namespaceName/* 目录

    • 填写目标附加依赖文件的OSS路径。

      您需要提前将附加依赖文件上传至当前实例对应的OSS Bucket,上传附加依赖文件的OSS Bucket必须为您开通Flink全托管时选择的OSS Bucket。

    • 填写目标附加依赖文件的URL,目前仅支持以文件名结尾的URL,例如http://xxxxxx/file

      您需要提前将附加依赖文件上传至公开可访问的HTTP服务。

    说明
    • 以上三种方式上传的附件依赖文件,在作业运行时,最终都会被加载到JM和TM所在Pod的/flink/usrlib目录下。
    • Session模式的作业不支持配置附加依赖文件路径。
    • 如果您的作业是Python API类型,则需要在这里选择您的Python作业文件,以及所用的依赖文件。Python依赖详情,请参见Python依赖管理。Python依赖上传之后,默认会被上传到作业运行节点的/flink/usrlib/目录下。
    并行度 作业并发数。
  3. 在作业开发页面右侧,单击高级配置,根据业务需要填写配置信息。
    参数解释如下表所示。
    类别 配置项 说明
    常规配置 Flink版本 支持Flink 1.10Flink 1.11Flink 1.12版本。
    说明 Python API作业需要选择为Flink 1.11及以上版本。
    Flink镜像标签 选择Flink镜像标签。
    说明 Python API作业需要选择为1.11.2-vvr-2.1.1及以上版本。
    行为配置 恢复策略 当作业状态变为运行时,State的恢复策略。取值如下:
    • Latest Savepoint :将从最新Savepoint文件恢复。
    • Latest State:将从最新的Savepoint或Checkpoint中恢复。
    • None:不带State恢复作业,即作业恢复后没有State了,需要重新开始计算。
    创建实例的最大重试次数 创建实例失败后的重试次数。
    Stop with Drain 如果开启Stop With Drain功能,当作业被手动停止或者保留State升级作业时,窗口中已有数据结果会输出,即使没有满足关窗条件。
    Flink配置 Checkpoint间隔 定时执行Checkpoint的时间间隔。如果不填写,将会关闭 Checkpoint。
    两次Checkpoint之间的最短时间间隔 两次Checkpoint之间的最短时间间隔,如果Checkpoint最大并行度是1,则该配置确保两个Checkpoint之间有一个最短时间间隔。
    Checkpoint Retention策略 当作业无法再重启或者作业被暂停时,是否需要保留最新完成的Checkpoint。该参数取值如下:
    • Always:作业停止时保留Checkpoint。
    • Only when FAILED:作业停止后删除Checkpoint。
    • Never(默认值):不保留Checkpoint。
    开启Unaligned Checkpoint 开启Unaligned Checkpoint会大大降低反压情况下Checkpoint的总执行时间。但是也会导致增大单次Checkpoint的大小。
    Flink重启策略配置 当有Task失败时,如果没有开启Checkpoint,JobManager进程不会重启。如果开启了Checkpoint,则JobManager进程会重启。该参数取值如下:
    • Failure Rate:基于失败率(您填写时间间隔内的最大失败次数)重启。
    • Fixed Delay:固定间隔重启。
    • No Restarts(默认值):不会重启。
    Flink Master 容错配置(高可用性) 该参数取值如下:
    • None(默认值):表示系统不会保存元数据给JobManager重启恢复时使用。
    • Kubernetes:表示系统会将元数据保存到Kubernetes 集群中供JobManager重启恢复使用。
    更多Flink配置 在此设置其他Flink配置。例如taskmanager.numberOfTaskSlots: 1
    资源配置 Task Managers数量 默认与并行度一致。
    Job Manager CPUs 默认值为1。
    Job Manager Memory 最小值为500 MiB。单位建议使用GiB或MiB,例如,1024 MiB或1.5 GiB。
    Task Manager CPUs 默认值为1。
    Task Manager Memory 最小值为1 GiB。单位建议使用GiB或MiB,例如,1024 MiB或1.5 GiB。
    日志配置 Root Log Level TRACE、DEBUG、INFO、WARN和ERROR。
    Log Levels 填写日志名称和日志级别。
    Logging Profile 日志模板,可以选择系统模板,也可以选择用户配置。
  4. 在作业开发页面右上角,单击上线