快速开始

更新时间:
复制为 MD 格式

本文向您介绍如何使用 Flink Python DataFrame API 编写一个作业,并部署运行。

前提条件

开始前,请确认已完成以下准备:

  • 已开通实时计算 Flink 版工作空间。具体操作,请参见开通实时计算Flink

  • 实时计算引擎 VVR 11.8 及以上版本预装了 Python 3.9、3.10 和 3.11,建议您选择其中之一作为本地开发环境的 Python 版本。

本地环境搭建

  1. 创建工作目录。

    mkdir dataframe-quickstart
    cd dataframe-quickstart
  2. (可选)本地开发环境配置:由于 Flink Python DataFrame API 为 VVR 专属功能,暂未在开源版 Flink 中提供,因此您需要使用下面的命令安装 VVR 版 PyFlink 依赖包:

    pip install --pre "ververica-flink>11.7"
    说明

    pip install 默认安装正式版本的依赖包。PyFlink DataFrame API 目前仅在 preview 版本的 VVR 中提供,因此您需要使用 --pre 参数安装最新的 PyFlink 预览版本,或者显式指定版本号:

    pip install "ververica-flink==11.8.0a1"
    说明

    ververica-flinkAPI-Only包,仅提供类型定义和接口声明,用于本地开发,作业仍需在实时计算Flink版平台提交。

编写 DataFrame API 作业

在当前目录下创建 quickstart.py 文件,并写入以下内容。

import pyflink.dataframe as pf
from pyflink.dataframe import col


def main():
    pf.config.set("parallelism.default", "1")

    sales = pf.from_records(
        [
            (1, "book", 42),
            (2, "book", 78),
            (3, "food", 120),
            (4, "game", 35),
            (5, "book", 52),
        ],
        schema=["user_id", "item", "amount"],
    )

    result = (
        sales.with_column("bonus_amount", col("amount") + 10)
        .filter(col("amount") >= 50)
        .rename({"item": "category"})
        .select("user_id", "category", "amount", "bonus_amount")
    )

    result.write_custom(connector="print", options={"logger": True})


if __name__ == "__main__":
    main()

部署作业

作业开发完成后,您可以通过以下方式部署作业:

  1. 登录实时计算控制台,进入目标工作空间。

  2. 在左侧导航栏选择 文件管理,上传 quickstart.py

  3. 运维中心 > 作业运维页面,单击部署作业 > Python作业,填写部署信息。

    • 部署模式:可选择流模式批模式

    • 部署名称:输入部署名称,如 quickstart。

    • 引擎版本:请选择 vvr-11.8.preview.1-jdk11-flink-1.20。

    • Python文件地址:选择已上传的 quickstart.py

  4. 单击部署

启动作业

  1. 运维中心 > 作业运维页面的下拉列表中,选择流作业批作业

  2. 在目标作业名称右侧,单击操作列下的启动

  3. 在右侧弹出的窗口中,单击启动

验证结果

该作业使用Print连接器将结果打印到Task Manager日志。作业结束后,您可以查看日志验证结果:

  1. 运维中心 > 作业运维页面的下拉列表中,选择流作业批作业

  2. 单击目标作业的名称。

  3. 单击作业日志

  4. 单击Task Managers页签,在当前 TaskManager下拉列表中,选择 task manager。

日志中包含以下数据时,表示作业已经运行成功:

+I[2, book, 78, 88]
+I[3, food, 120, 130]
+I[5, book, 52, 62]