本文带您快速体验Flink Python流作业和批作业的部署和启动,以了解实时计算Flink版Python作业的操作流程。
前提条件
如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。
已创建Flink工作空间,详情请参见开通实时计算Flink版。
步骤一:准备Python代码文件
实时计算管理控制台不提供Python的开发环境,您需要在本地完成作业开发。有关作业调试和连接器的使用,详情请参见Python作业开发。
本地开发依赖的Flink版本需确保与后续步骤三:部署Python作业选择的引擎版本保持一致,在Python作业中使用其他依赖(自定义的Python虚拟环境、第三方Python包、JAR包和数据文件等)的方法请参见使用Python依赖。
为了帮助您快速熟悉Flink Python作业操作,本文已为您提供统计单词出现频率的测试Python文件和数据文本,您可以直接下载待后续步骤使用。
根据需要下载Python测试作业。
单击Shakespeare,下载数据文本Shakespeare。
步骤二:上传Python文件和数据文件
步骤三:部署Python作业
流作业
在 界面,单击 。
填写部署信息。
参数
说明
示例
部署模式
请选择部署为流模式。
流模式
部署名称
填写对应的Python作业名称。
flink-streaming-test-python
引擎版本
当前作业使用的Flink引擎版本。
vvr-8.0.9-flink-1.17
Python文件地址
单击word_count_streaming.py下载测试Python文件后,再单击右侧图标选择文件,上传Python文件。
-
Entry Module
程序的入口类。
如果Python作业文件为.py文件,则该项不需要填写。
如果Python作业文件为.zip文件,则需要在此处输入您的Entry Module,例如word_count。
无需填写
Entry Point Main Arguments
填写传入参数信息,在主方法里面调用该参数。
本文填写输入数据文件Shakespeare的存放路径。
部署目标
在下拉列表中,选择目标资源队列或者Session集群(请勿生产使用)。详情请参见管理资源队列和创建Session集群。
重要部署到Session集群的作业不支持显示监控告警(或数据曲线)、配置监控告警和开启自动调优功能。请勿将Session集群用于正式生产环境,Session集群可以作为开发测试环境。详情请参见作业调试。
default-queue
更多配置参数详情请参见部署作业。
单击部署。
批作业
在 界面,单击部署作业,选择Python作业。
填写部署信息。
参数
说明
示例
部署模式
请选择部署为批模式。
批模式
部署名称
填写对应的Python作业名称。
flink-batch-test-python
引擎版本
当前作业使用的Flink引擎版本。
vvr-8.0.9-flink-1.17
Python文件地址
单击word_count_batch.py下载测试Python文件后,再单击右侧图标选择文件,上传Python文件。
-
Entry Module
程序的入口类。
如果Python作业文件为.py文件,则该项不需要填写。
如果Python作业文件为.zip文件,则需要在此处输入您的Entry Module,例如word_count。
无需填写
Entry Point Main Arguments
填写传入参数信息,在主方法里面调用该参数。
本文填写输入数据文件Shakespeare和输出数据目录batch-quickstart-test-output的存放路径。
说明您只需指定输出目录的路径名称,无需提前在存储服务中创建输出目录,输出目录的父目录路径与输入文件保持一致即可。
存储类型为OSS Bucket:
--input oss://<您绑定的OSS Bucket名称>/artifacts/namespaces/<项目空间名称>/Shakespeare
--output oss://<您绑定的OSS Bucket名称>/artifacts/namespaces/<项目空间名称>/python-batch-quickstart-test-output
您可以直接在文件管理中复制Shakespeare文件的完整路径。
存储类型为全托管存储:
--input oss://flink-fullymanaged-<工作空间ID>/artifacts/namespaces/<项目空间名称>/Shakespeare
--output oss://flink-fullymanaged-<工作空间ID>/artifacts/namespaces/<项目空间名称>/python-batch-quickstart-test-output
部署目标
在下拉列表中,选择目标资源队列或者Session集群(请勿生产使用)。详情请参见管理资源队列和创建Session集群。
重要部署到Session集群的作业不支持显示监控告警、配置监控告警和开启自动调优功能。请勿将Session集群用于正式生产环境,Session集群可以作为开发测试环境。详情请参见作业调试。
default-queue
更多配置参数详情请参见部署作业。
单击部署。
步骤四:启动Python作业并查看Flink计算结果
流作业
在
页面,单击目标作业名称操作列中的启动。选择无状态启动,单击启动,作业启动详情请参见作业启动。
单击启动后,作业状态变为运行中或已完成,则代表作业运行正常。如果您部署本文档Python测试文件,作业最终运行状态是已完成状态。
作业状态变为运行中后,查看流作业示例的计算结果。
重要如果您部署的是本文Python测试文件,流作业变为已完成状态时会删除作业结果,故流作业状态为运行中才能看到计算结果。
在TaskManager中以.out结尾的日志文件中,搜索shakespeare查看Flink计算结果。
批作业
在
页面,单击目标作业中的启动。在作业启动对话框中,单击启动,作业启动详情请参见作业启动。
作业状态变为已完成后,查看批作业示例的计算结果。
存储类型为OSS Bucket:登录OSS管理控制台,在oss://<您绑定的OSS Bucket名称>/artifacts/namespaces/<项目空间名称>/python-batch-quickstart-test-output目录,单击名称是作业的启动日期和时间的文件夹,然后单击目标文件名,在弹出的面板上单击下载。
存储类型为全托管存储:您可以在文件管理页面的资源文件页签,单击python-batch-quickstart-test-output.txt/yyyy-MM-dd--n/prefix-****.ext格式的文件操作列的下载,在本地进行查看。
批作业结果为ext文件,下载后可以用记事本或者Microsoft Office Word打开查看结果,计算结果如下图所示。
(可选)步骤五:停止作业
如果您对作业进行了修改(例如更改代码、增删改WITH参数、更改作业版本等),且希望修改生效,则需要重新部署作业,然后停止再启动。另外,如果作业无法复用State,希望作业全新启动时,或者更新非动态生效的参数配置时,也需要停止后再启动作业。作业停止详情请参见作业停止。
相关文档
您可以在作业启动前配置作业资源或者作业上线后修改作业资源,支持基础模式(粗粒度)和专家模式(细粒度)两种资源模式,详情请参见配置作业资源。
支持动态更新Flink作业参数,可以实现更快的参数配置生效,减少作业启停对业务的中断时间,详情请参见动态扩缩容与参数动态更新。
配置作业日志级别以及配置不同级别日志分别输出,详情请参见配置作业日志输出。
SQL作业完整的开发流程示例,请参见Flink SQL作业快速入门。