Flink Python作业快速入门
本文带您快速体验Flink全托管Python流作业和批作业的部署和启动等操作。
前提条件
已准备阿里云账号及账户余额。
账号注册操作步骤,请参见账号注册。
阿里云账户余额不少于100.00元人民币或等值的代金券或优惠券。
已创建工作空间,详情请参见开通Flink全托管和创建与管理项目空间。
(可选)步骤一:下载Python测试文件
本快速入门为了带您快速熟悉Flink流作业和批作业的操作,已为您提供了测试Python文件和输入数据文件,您可以直接单击下载待后续步骤使用。
您可以单击下载以下任意Python测试作业。
单击word_count_streaming.py,下载测试Python流作业。
单击word_count_batch.py,下载测试Python批作业。
单击Shakespeare,下载输入数据文件Shakespeare。
步骤二:上传Python文件
登录实时计算控制台。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在左侧导航栏,单击资源管理。
单击上传资源,选择您要上传的Python文件或者提供的Python测试文件。
在Python作业中使用其他依赖(例如自定义的Python虚拟环境、第三方Python包、JAR包和数据文件等)的方法请参见使用Python依赖。
步骤三:创建作业
流作业
登录实时计算控制台。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在作业运维界面,单击部署作业。
填写部署信息。
参数
说明
示例
部署作业类型
JAR或者Python。
Python
部署模式
请选择部署为流模式或批模式。
流模式
部署名称
填写对应的Python作业名称。
flink-streaming-test-python
引擎版本
当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。
vvr-6.0.7-flink-1.15
Python文件地址
单击word_count_streaming.py下载测试Python文件后,再单击右侧
图标选择文件,上传Python文件。
oss://flink-test-oss/artifacts/namespaces/flink-test-default/word_count_streaming.py
Entry Module
程序的入口类。
如果Python作业文件为.py文件,则该项不需要填写。
如果Python作业文件为.zip文件,则需要在此处输入您的Entry Module,例如word_count。
无需填写
Entry Point Main Arguments
填写输入数据文件的OSS路径。
说明本示例中输入数据文件、输出文件和测试Python存放路径一致。统一放在OSS控制台,名称为flink-test-oss的Bucket下。
本示例中,将计算结果写入到OSS指定目录为例,为您展示如何配置该参数。在此您仅需要指定结果数据输出文件路径和名称,无需提前在指定目录创建。
--input oss://flink-test-oss/artifacts/namespaces/flink-test-default/Shakespeare
Python Libraries
第三方Python包。第三方Python包会被添加到Python worker进程的PYTHONPATH中,从而在Python自定义函数中可以直接访问。如何使用第三方Python包,详情请参见使用第三方Python包。
无需填写
Python Archives
存档文件,Python Archives详情请参见使用自定义的Python虚拟环境和使用数据文件。
无需填写
附加依赖文件
填写目标附加依赖文件的OSS路径或者URL。
无需填写
提交到Session集群
不推荐生产环境使用。如果您选中了提交到Session集群后,需要在下面的下拉列表中,选择目标Session集群。Session集群创建步骤详情请参见步骤一:创建Session集群。
无需选中
备注
可选,填写备注信息。
无需填写
更多设置
打开该开关后,您需要配置以下信息:
Kerberos集群:单击左侧下拉列表选择您已创建的Kerberos集群,Kerberos集群创建操作详情请参见注册Hive Kerberos集群。
principal:Kerberos principal又称为主体,主体可以是用户或服务,用于在Kerberos加密系统中标记一个唯一的身份。
无需填写
单击部署。
批作业
登录实时计算控制台。
在Flink全托管页签,单击目标工作空间操作列下的控制台。
在作业运维界面,单击部署作业。
填写部署信息。
参数
说明
示例
部署作业类型
JAR或者Python。
Python
部署模式
请选择部署为流模式或批模式。
批模式
部署名称
填写对应的Python作业名称。
flink-batch-test-python
引擎版本
当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。
vvr-6.0.7-flink-1.15
Python文件地址
单击word_count_batch.py下载测试Python文件后,再单击右侧
图标选择文件,上传Python文件。
oss://flink-test-oss/artifacts/namespaces/flink-test-default/word_count_batch.py
Entry Module
程序的入口类。
如果Python作业文件为.py文件,则该项不需要填写。
如果Python作业文件为.zip文件,则需要在此处输入您的Entry Module,例如word_count。
无需填写
Entry Point Main Arguments
填写输入以及输出数据文件的OSS路径。
说明本示例中输入数据文件、输出文件和测试Python存放路径一致。统一放在OSS控制台,名称为flink-test-oss的Bucket下。
本示例中,将计算结果写入到OSS指定目录为例,为您展示如何配置该参数。在此您仅需要指定结果数据输出文件路径和名称,无需提前在指定目录创建。
--input oss://flink-test-oss/artifacts/namespaces/flink-test-default/Shakespeare --output oss://flink-test-oss/artifacts/namespaces/flink-test-default/python-batch-quickstart-test-output.txt
Python Libraries
第三方Python包。第三方Python包会被添加到Python worker进程的Python路径中,从而在Python自定义函数中可以直接访问。如何使用第三方Python包,详情请参见使用第三方Python包。
无需填写
Python Archives
存档文件,Python Archives详情请参见使用自定义的Python虚拟环境和使用数据文件。
无需填写
附加依赖文件
填写目标附加依赖文件的OSS路径或者URL。
无需填写
提交到Session集群
不推荐生产环境使用。如果您选中了提交到Session集群后,需要在下面的下拉列表中,选择目标Session集群。Session集群创建步骤详情请参见步骤一:创建Session集群。
无需选中
备注
可选,填写备注信息。
无需填写
更多设置
打开该开关后,您需要配置以下信息:
Kerberos集群:单击左侧下拉列表选择您已创建的Kerberos集群,Kerberos集群创建操作详情请参见注册Hive Kerberos集群。
principal:Kerberos principal又称为主体,主体可以是用户或服务,用于在Kerberos加密系统中标记一个唯一的身份。
无需填写
单击部署。
步骤四:启动Python作业
在作业运维页面,单击目标作业名称操作列中的启动。
单击启动后会弹出作业启动对话框,配置对话框详情请参见作业启动。
单击对话框中的启动。
单击启动后,您可以看到作业状态变为运行中或已完成,则代表作业运行正常。
重要如果您使用文档Python测试文件,作业最终运行状态是已完成状态。
如果您需要启动批作业,则需要在作业运维页面,将作业类型切换为批作业,才可以看到您上线的批作业。系统默认展示的作业为流作业。
步骤五:查看Flink计算结果
查看Python流作业计算结果:
在TaskManager中以.out结尾的日志文件中,搜索shakespeare查看Flink计算结果。
重要如果您使用的是Python测试文件,流作业变为已完成状态时会删除作业结果,故流作业状态为运行中才能查看计算结果。
查看Python批作业计算结果:
登录OSS管理控制台,在您配置的数据输出文件存放目录查看结果。
本示例中的数据输出文件目录为oss://flink-test-oss/artifacts/namespaces/flink-test-default/batch-quickstart-test-output.txt/,该目录下的文件夹名称是作业的启动日期和时间,单击文件夹查看作业结果文件。单击目标文件名后,在弹出的界面单击下载,操作如下图所示。
批作业结果为ext文件,下载后可以用记事本或者Microsoft Office Word打开查看结果,计算结果如下图所示。
(可选)步骤六:停止作业
如果您对作业进行了修改且希望修改生效,则需要将运行中或已完成的作业停止再启动。另外,如果作业无法复用State,希望作业全新启动时,也需要停止后再启动作业。作业停止详情请参见作业停止。