本文为您介绍如何提交Flink全托管DataStream API、Table API和Python API作业至集群运行。

上传JAR包/Python作业文件/Python依赖

DataStream API、Table API和Python API作业运行前,需要您按照以下步骤将JAR包、Python作业文件或Python依赖上传到Flink全托管控制台。

  1. 登录实时计算统一控制台
  2. Flink全托管页签,单击对应工作空间操作列下的开发控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的JAR包/Python作业文件/Python依赖。
    如果您的作业是Python API类型,则需要填写PyFlink的官方JAR包。官方JAR的下载地址,请参见PyFlink V1.11

创建作业

  1. 登录实时计算统一控制台
  2. Flink全托管页签,单击对应工作空间操作列下的开发控制台
  3. 单击创建作业
  4. 基础配置页面,填写基本配置信息。创建作业
    说明
    • 您可以根据实际业务需要,单击显示高级配置,填写行为配置Flink配置资源配置日志配置等信息。
    • 您可以在创建作业页面右上角,单击以YAML模式编辑,直接修改配置信息。
    参数解释如下表所示。
    类别 配置项 说明
    基础配置 作业名称 作业名称
    Jar URI 如果您的作业是Python API类型,则需要填写PyFlink的官方JAR包,官方JAR的下载地址,请参见PyFlink V1.11
    Entrypoint Class 程序的入口类。如果您的作业是Python API类型,则Entrypoint Class需要填写为org.apache.flink.client.python.PythonDriver
    说明 如果您的JAR未指定主类,请在此处输入您的Entrypoint Class类的标准路径。
    Entrypoint main args 如果您的作业是Python API类型,需要首先上传您的Python作业文件。Python作业文件上传之后,默认会被上传到作业运行节点的/flink/usrlib/目录下。

    假如您的Python作业文件名为word_count.py,则Entrypoint main args需要填写为-py /flink/usrlib/word_count.py

    说明 Python作业文件的路径需要填写为完整路径,/flink/usrlib/不能省略,且不能更改。
    并行度 作业并发数
    高级配置 行为配置 升级策略 在作业修改配置且重启时,Application Manager的行为。取值如下:
    • Stateless:将停止当前作业,并使用最新配置启动一个新作业。
    • Stateful:带状态升级,将对该作业做一个 Savepoint,并从该 Savepoint 用最新配置启动一个新作业。
    • None :配置变更时,不会对正在运行的 Flink Job 执行自动重启。
    初始状态 作业的初始状态。取值如下:
    • Running:作业创建完成后会直接运行。
    • Cancelled:作业创建完成后,需要手动操作进行启动。
    恢复策略 当作业状态变为运行时,State的恢复策略。取值如下:
    • Latest Savepoint :将从最新Savepoint文件恢复。
    • Latest State:将从最新的Savepoint或Checkpoint中恢复。
    • None:不带State恢复作业。
    说明 升级策略恢复策略需要配合使用。如果升级策略选择Stateful恢复策略选择None,则完成Savepoint后,作业会从作业的起始时间重新启动,而不是从Savepoint恢复。
    创建Savepoint的最大重试次数 升级期间创建Savepoint操作的重试次数。
    创建实例的最大重试次数 启动实例操作的重试次数。
    Stop with Drain 如果开启Stop With Drain功能,当作业被手动停止或者保留State升级作业时,窗口中已有数据结果会输出,即使没有满足关窗条件。
    配置 标签 您可以在标签选项中添加作业标签,便于在总览页面快速定位作业。
    标签值
    附加依赖文件 如果您的作业是Python API类型,则需要在这里选择您的Python作业文件,以及所用的依赖文件。Python依赖详情,请参见Python依赖管理
    说明 Python依赖上传之后,默认会被上传到作业运行节点的/flink/usrlib/目录下。
    Flink版本 仅支持Flink 1.10Flink 1.11两个大版本。
    说明 Python API作业需要选择为Flink 1.11及以上版本。
    Flink镜像标签 选择Flink镜像标签。
    说明 Python API作业需要选择为1.11.2-vvr-2.1.1及以上版本。
    Flink配置 Checkpoint间隔 定时执行Checkpoint的时间间隔。如果不填写,将会关闭 Checkpoint。
    两次 Checkpoint之间的最短时间间隔 两次Checkpoint之间的最短时间间隔,如果Checkpoint最大并行度是1,则该配置确保两个Checkpoint之间有一个最短时间间隔。
    Checkpoint Retention策略 当作业无法再重启或者作业被暂停时,是否需要保留最新完成的Checkpoint。该参数取值如下:
    • Always
    • Only when FAILED
    • Never
    开启Unaligned Checkpoint 开启Unaligned Checkpoint会大大降低反压情况下Checkpoint的总执行时间。但是也会导致增大单次Checkpoint的大小。
    Flink重启策略配置 Flink Default是默认重启策略。当有Task失败时,如果没有开启Checkpoint,JobManager进程不会重启。如果开启了Checkpoint,则JobManager进程会重启。该参数取值如下:
    • No Restarts
    • Fixed Delay
    • Failure Rate
    其他配置 配置 在此设置其他Flink配置。例如taskmanager.numberOfTaskSlots: 1
    资源配置 Task Managers数量 默认与并行度一致。
    Job Manager CPUs 默认值为1。
    Job Manager Memory 最小值为500Mi。单位建议使用Gi或Mi,例如,1024Mi或1.5Gi。
    Task Manager CPUs 默认值为1。
    Task Manager Memory 最小值为1Gi。单位建议使用Gi或Mi,例如,1024Mi或1.5Gi。
    日志配置 Root Log Level TRACE、DEBUG、INFO、WARN和ERROR。
    Log Levels 填写日志名称和日志级别。
    Logging Profile 日志模板,可以选择系统模板,也可以选择用户配置。
  5. 单击创建作业

启动作业

  1. 登录实时计算统一控制台
  2. Flink全托管页签,单击对应工作空间操作列下的开发控制台
  3. 在左侧导航栏上,单击作业列表
  4. 单击您创建的目标作业名称。
  5. 在作业详情页面,单击启动,即可将作业提交至集群运行。启动作业