本文向您介绍如何使用 Flink Python DataFrame API 编写一个作业,并部署运行。
前提条件
开始前,请确认已完成以下准备:
已开通实时计算 Flink 版工作空间。具体操作,请参见开通实时计算Flink版。
实时计算引擎 VVR 11.8 及以上版本预装了 Python 3.9、3.10 和 3.11,建议您选择其中之一作为本地开发环境的 Python 版本。
本地环境搭建
创建工作目录。
mkdir dataframe-quickstart cd dataframe-quickstart(可选)本地开发环境配置:由于 Flink Python DataFrame API 为 VVR 专属功能,暂未在开源版 Flink 中提供,因此您需要使用下面的命令安装 VVR 版 PyFlink 依赖包:
pip install --pre "ververica-flink>11.7"说明pip install 默认安装正式版本的依赖包。PyFlink DataFrame API 目前仅在 preview 版本的 VVR 中提供,因此您需要使用
--pre参数安装最新的 PyFlink 预览版本,或者显式指定版本号:pip install "ververica-flink==11.8.0a1"说明ververica-flink是API-Only包,仅提供类型定义和接口声明,用于本地开发,作业仍需在实时计算Flink版平台提交。
编写 DataFrame API 作业
在当前目录下创建 quickstart.py 文件,并写入以下内容。
import pyflink.dataframe as pf
from pyflink.dataframe import col
def main():
pf.config.set("parallelism.default", "1")
sales = pf.from_records(
[
(1, "book", 42),
(2, "book", 78),
(3, "food", 120),
(4, "game", 35),
(5, "book", 52),
],
schema=["user_id", "item", "amount"],
)
result = (
sales.with_column("bonus_amount", col("amount") + 10)
.filter(col("amount") >= 50)
.rename({"item": "category"})
.select("user_id", "category", "amount", "bonus_amount")
)
result.write_custom(connector="print", options={"logger": True})
if __name__ == "__main__":
main()部署作业
作业开发完成后,您可以通过以下方式部署作业:
登录实时计算控制台,进入目标工作空间。
在左侧导航栏选择 ,上传
quickstart.py。在页面,单击,填写部署信息。
部署模式:可选择流模式或批模式。
部署名称:输入部署名称,如 quickstart。
引擎版本:请选择 vvr-11.8.preview.1-jdk11-flink-1.20。
Python文件地址:选择已上传的
quickstart.py。
单击部署。
启动作业
在页面的下拉列表中,选择流作业或批作业。
在目标作业名称右侧,单击操作列下的启动。
在右侧弹出的窗口中,单击启动。
验证结果
该作业使用Print连接器将结果打印到Task Manager日志。作业结束后,您可以查看日志验证结果:
在页面的下拉列表中,选择流作业或批作业。
单击目标作业的名称。
单击作业日志。
单击Task Managers页签,在当前 TaskManager下拉列表中,选择 task manager。
日志中包含以下数据时,表示作业已经运行成功:
+I[2, book, 78, 88]
+I[3, food, 120, 130]
+I[5, book, 52, 62]