本文为您介绍如何提交Flink全托管DataStream API作业至集群运行。
上传JAR包
- 登录实时计算管理控制台。
- 在Flink全托管页签,单击目标工作空间操作列下的控制台。
- 在左侧导航栏,单击资源上传。
- 单击上传资源,选择您要上传的JAR包。如果您的作业是Python API类型,则需要上传PyFlink的官方JAR包。官方JAR的下载地址,请参见PyFlink V1.11和PyFlink V1.12。说明 推荐您通过单独的Python作业入口上传Python资源,详情请参见概述。
创建作业
- 登录Flink全托管开发控制台,新建作业。
- 登录实时计算管理控制台。
- 在Flink全托管页签,单击目标工作空间操作列下的控制台。
- 在左侧导航栏,单击作业开发。
- 单击新建。
- 在新建文件对话框,填写作业配置信息。
作业参数 说明 文件名称 作业的名称。 说明 作业名称在当前项目中必须保持唯一。文件类型 文件类型需要选择为流作业/JAR。 流作业和批作业均支持以下类型:- SQL
- JAR
- PYTHON
说明 实时计算引擎VVR 3.0.1及以上版本支持批作业。部署目标 选择作业需要部署的集群,支持以下两种集群模式: - Per-Job集群(默认):适用于占用资源比较大或持续稳定运行的作业。因为作业之间资源隔离,每个作业都需要一个独立的JM,小任务JM的资源利用率较低。
- Session集群:适用于占用资源比较小或任务启停比较频繁的作业。因为多个作业可以复用相同的JM,可以提高JM资源利用率。
存储位置 指定该作业的代码文件所属的文件夹。 您还可以在现有文件夹右侧,单击
图标,新建子文件夹。
- 单击确认。
- 在作业开发页面,填写基本配置信息。您可以直接填写以下配置信息,也可以单击YAML直接修改配置信息。配置参数解释如下表所示。
参数 说明 部署目标 您可以修改创建作业时已选择的部署目标。 JAR URI 请选择一个文件或者手动上传新文件,您可以拖拽文件到此区域或者单击右侧 图标选择文件上传。
说明 如果您的作业是Python API类型,则需要填写PyFlink的官方JAR包,官方JAR的下载地址,请参见PyFlink V1.11和PyFlink V1.12。Entry Point Class 程序的入口类。如果您的JAR包未指定主类,请在此处输入您的Entrypoint Class类的标准路径。 说明 如果您的作业是Python API类型,Entrypoint class应该填写为org.apache.flink.client.python.PythonDriver。Entry Point Main Arguments 您可以在此处传入参数,在主方法里面去调用该参数。 说明- 参数信息长度不要大于1024,且不建议用来传复杂参数,复杂参数指包括了换行、空格或者其他特殊字符的参数。如果您需要传入复杂参数,请使用附加依赖文件来传输。
- 如果您的作业是Python API类型,需要首先上传您的Python作业文件。Python作业文件上传之后,默认会被上传到作业运行节点的/flink/usrlib/目录下。
假如您的Python作业文件名为word_count.py,则Entrypoint main args需要填写为
-py /flink/usrlib/word_count.py
。Python作业文件的路径需要填写为完整路径,/flink/usrlib/不能省略,且不能更改。
附加依赖文件 - (推荐)选择您已上传的目标附加依赖文件。
您需要提前通过Flink全托管开发控制台左侧资源上传或作业开发页面附件依赖文件右侧的
上传附件依赖文件。上传的附件依赖文件会固定被保存在oss://ossBucketName/artifacts/namespaces/namespaceName/* 目录。
- 填写目标附加依赖文件的OSS路径。
您需要提前将附加依赖文件上传至当前实例对应的OSS Bucket,上传附加依赖文件的OSS Bucket必须为您开通Flink全托管时选择的OSS Bucket。
- 填写目标附加依赖文件的URL,目前仅支持以文件名结尾的URL,例如http://xxxxxx/file。
您需要提前将附加依赖文件上传至公开可访问的HTTP服务。
说明- Session集群不支持设置附加依赖文件,仅Per-Job集群支持设置附加依赖文件。
- 以上三种方式上传的附件依赖文件,在作业运行时,最终都会被加载到JM和TM所在Pod的/flink/usrlib目录下。
- Session模式的作业不支持配置附加依赖文件路径。
- 如果您的作业是Python API类型,则需要在这里选择您的Python作业文件,以及所用的依赖文件。Python依赖详情,请参见Python依赖管理。Python依赖上传之后,默认会被上传到作业运行节点的/flink/usrlib/目录下。
- 在作业开发页面右侧,单击高级配置,根据业务需要填写配置信息。参数解释如下表所示。
类别 配置项 说明 常规配置 引擎版本 仅支持vvr-4.0.7-flink-1.13和vvr-3.0.3-flink-1.12版本。 说明 从VVR 3.0.3版本(对应Flink 1.12版本)开始,VVP支持同时运行多个不同引擎版本的SQL作业。如果您的作业已使用了Flink 1.12及更早版本的引擎,您需要按照以下情况进行处理:- Flink 1.12版本:停止后启动作业,系统将自动将引擎升级为vvr-3.0.3-flink-1.12版本。
- Flink 1.11或Flink 1.10版本:手动将作业引擎版本升级到vvr-3.0.3-flink-1.12或vvr-4.0.8-flink-1.13版本后重启作业,否则会在启动作业时超时报错。
编辑标签 您可以为作业设置标签。后续可以通过搜索的方式,快速找到目标作业。 Flink配置 Checkpoint间隔 定时执行Checkpoint的时间间隔。如果不填写,将会关闭 Checkpoint。 两次Checkpoint之间的最短时间间隔 两次Checkpoint之间的最短时间间隔,如果Checkpoint最大并行度是1,则该配置确保两个Checkpoint之间有一个最短时间间隔。 开启Unaligned Checkpoint 开启Unaligned Checkpoint会大大降低反压情况下Checkpoint的总执行时间。但是也会导致增大单次Checkpoint的大小。 Flink重启策略配置 当有Task失败时,如果没有开启Checkpoint,JobManager进程不会重启。如果开启了Checkpoint,则JobManager进程会重启。该参数取值如下: - Failure Rate:基于失败率(您填写时间间隔内的最大失败次数)重启。
选择基于失败率重启后,您还需要设置检测Failure Rate的时间间隔、时间间隔内的最大失败次数和每次重启时间间隔。
- Fixed Delay:固定间隔重启。
选择基于固定间隔重启后,您还需要设置尝试重启的次数和每次重启时间间隔。
- No Restarts(默认值):不会重启。
更多Flink配置 在此设置其他Flink配置。例如 taskmanager.numberOfTaskSlots: 1
。日志配置 开启日志归档 默认已开启日志归档功能。开启日志归档后,您可以在作业探查页面查看历史作业实例的日志,详情请参见查看历史作业实例日志。 说明- 在VVR 3.x版本,仅VVR 3.0.7及以上版本支持开启日志归档功能。
- 在VVR 4.x版本,仅VVR 4.0.11及以上版本支持开启日志归档功能。
归档日志有效期(天) 归档日志有效期默认为7天。 Root Log Level 日志级别从低到高顺序如下: - TRACE:比DEBUG更细粒度的信息。
- DEBUG:系统运行状态的信息。
- INFO:重要或者您感兴趣的信息。
- WARN:系统可能出现潜在错误的信息。
- ERROR:系统出现错误和异常的信息。
Log Levels 填写日志名称和日志级别。 Logging Profile 日志模板,可以选择系统模板,也可以选择用户配置。 - 在作业开发页面右上角,单击上线。