创建Flink SQL任务

Dataphin支持创建Flink SQL代码任务。本文为您介绍如何基于Ververica Flink实时引擎创建Flink SQL任务。

背景信息

阿里云实时计算Flink版是一套基于Apache Flink构建的⼀站式实时大数据分析平台,提供端到端亚秒级实时数据分析能力,并通过标准SQL降低业务开发门槛,助力企业向实时化、智能化大数据计算升级转型。

实时引擎为Ververica Flink支持的功能,请参见实时引擎适配功能说明

前提条件

在进行以下操作前,请确认所需创建Flink SQL任务的项目已开启实时引擎并已配置Ververica Flink计算源。具体操作,请参见创建项目

image

步骤一:新建Flink SQL任务

  1. Dataphin研发工作台

  2. 按照下图指引,进入新建Flink_SQL对话框。

    gagaga①区域选择所需创建Flink SQL任务的项目空间,如果您的项目生产开发类型为Dev-Prod模式,则需要在区域②选择环境为Dev。

  3. 新建Flink_SQL任务对话框,配置参数后,单击确定

    vvp

    参数

    说明

    任务名称

    名称的命名规则如下:

    • 只能包含小写英文字母、数字、下划线(_)。

    • 名称的长度范围为3~62个字符。

    • 项目内的名称不支持重复。

    • 名称仅支持以英文字母开头。

    集群

    选择Flink SQL任务所在集群。

    引擎版本

    选择任务运行的引擎版本。

    存储目录

    默认选择为代码管理,同时您也可以在计算任务页面创建目标文件夹后,选择该目标文件夹为Flink SQL任务的目录。gagaga

    描述

    填写对Flink SQL任务的简单描述。

步骤二:开发及调试Flink SQL任务的代码

  1. 在Flink SQL任务代码页面,编写任务的代码。

    gagagaga
    说明

    您可以单击页面右上方的格式化,系统自动调整SQL代码格式。

  2. 单击页面左上方的预编译,系统将会自动校验代码任务的语法及权限问题。

    说明
    1. 预编译成功,在页面上方出现1弹窗。

    2. 预编译失败,在页面上方出现2弹窗,您可单击页面底部Console,查看预编译失败日志。

  3. 如果实时计算引擎是是Ververica Flink,则Dataphin支持调试已开发的代码。单击页面右上方的调试,代码任务将采样数据并进行本地调试,保障代码任务的正确性。

  4. 在弹出调试配置窗口中,选择调试的模式后,单击下一步

    fagag
  5. 采样调试数据步骤中,为元数据进行采样调试。Snipaste_2022-11-07_14-32-30

您可以通过自动抽样或上传数据方式,为元数据进行采样调试。各适用场景说明如下:

  • 自动抽样:自动抽样到的数据是随机的,所以适用于对采集到的数据没有限制的场景。选择自动抽样后,需要配置抽样条数。仅支持DataHub,MaxCompute,TimeTunnel数据源进行自动抽样。

重要

如果元数据表中没有数据,则自动抽样将采集不到数据。

调试任务时根据元表的配置读取开发表或生产元表。如何查看及配置任务调试时读取的数据表类型,请参见新建元表。详细说明如下:

  • 元表的任务调试时可读取参数选中开发表,详细说明如下:

    • 任务中使用的是Project_Name_dev.元表名,则自动抽取开发元表。如果数据源无开发元表,则不支持自动抽样

    • 任务中使用的是Project_Name.元表名,则自动抽取生产元表。如果您没有生产环境元表权限,则会报错。如何申请生产元表权限,请参见申请表权限

    • 任务中使用的是${Project_Name}.元表名元表名,则自动抽取开发元表。如果数据源无开发元表,则不支持自动抽样

  • 元表的任务调试时可读取参数选中生产表,详细说明如下:

    • 任务中使用的是Project_Name_dev.元表名,则自动抽取开发元表。如果数据源无开发元表,则不支持自动抽样

    • 任务中使用的是Project_Name.元表名,则自动抽取生产元表。

    • 任务中使用的是${Project_Name}.元表名元表名,根据参数中已经设置替换${project_name},则调试读取的表使用参数中指定开发或生产项目确定使用生产元表还是开发元表;若未指定${project_name},则自动抽取生产元表。

  • 上传本地数据:在采样调试数据时,若出现以下场景,您可选择手动上传本地数据进行调试:

    • 不支持的数据源。

    • 自动抽样满足不了构造的调试场景。

    • 抽取性能较差时,元数据无法正常采集。

    • 数据抽样的逻辑严格,采集效率低。例如从100万条数据中抽取其中1条数据。

      说明

      上传本地数据前需要先下载样例,根据下载的样例编辑需要上传的数据,单击上传后,数据自动填充至元数据采样区域。

      ggaga

  • 手动输入:适用于采集的数据比较少,或者需要修改已采集到的数据的场景。

  • 完成所有数据表的元数据采样后,单击页面下方的确定

  • Result页面,查看调试数据、中间结果和调试结果。

    gagag

步骤三:配置Flink SQL任务

  1. 如果实时引擎是Ververica Flink,则您可以为Flink SQL任务自定义运行的资源。在实时模式页签下配置Flink SQL任务的运行资源。

    gagaga集群引擎版本默认为创建Flink SQL任务时选择的配置,同时您也可以修改资源队列和引擎版本。Job Manager CPUs和Job Manager CPUs默认为1,您也可以修改。

    Dataphin支持的资源配置类型包括系统推荐配置和自定义配置:

    • 系统推荐配置:即实时计算Flink配置细粒度资源的智能模式(BETA),智能模式为专家模式的升级版。在智能模式下,作业将启用专家模式的资源配置,同时开启AutoPilot自动调优功能。

      在智能配置模式下,您无需配置相关资源,AutoPilot会自动为作业生成资源配置,并根据作业的运行情况帮您进行资源配置的调优,在确保作业处于健康的状态下,优化作业资源的使用。AutoPilot详情请参见配置自动调优。

    • 自定义配置:即实时计算Flink配置细粒度资源的专家模式 (BETA),Flink全托管引入的全新的资源配置模式,支持对作业所使用的资源进行细粒度的资源控制,以满足作业吞吐的要求。

      系统会自动根据您配置的资源需求,以Native K8s的模式运行作业,TM的规格和个数将会根据Slot的规格和作业并发度,由系统自动决定。

    更多关于配置细粒度资源的内容,请参见配置细粒度资源

    1. 选中自定义配置资源配置类型,并单击去配置

    2. 在资源配置页面,为您展示一张拓扑图,图中每个方框代表了一个计算任务,都可以进行独立配置。每个Group代表着Group内部的节点可以存放在一台机器进行计算,可以有效避免数据的网络传播,提升性能。图中当前的资源配置就是默认为您展示系统推荐的资源配置。

    3. 单击需要配置资源的Group右上角的①后,在自定义配置Group执行参数和自定义配置StreamExecCalc-7执行参数对话框配置参数后,单击确定

      gagaga

      参数

      描述

      CPU

      定义当前节点运行的CPU。

      heap_memory

      单位为MB,heap_memory是堆内存,供Java应用程序使用的内存。

      heap_memory及其内部各组成的大小可以通过JVM的一系列命令行参数来控制,在一般的blink程序中,都会需要一定的heap_memory开销,,例如申请一定的heap_memory作为程序的缓存等,因此您可以按程序的规模来设置其大小。

      Off-Heap Memory

      定义堆外内存大小。

    4. 按照下图指引,进入自动应配置StreamExecCalc-7执行参数对话框并配置参数。完成参数配置后单击确定

      gagaga并行数默认为1,可填任意大于0的整数,-1代表自动推断。

    5. 配置完成后,单击页面右上方的保存

      重要

      如果您想继续使用系统推荐的资源配置,则单击页面右上方的重置为系统初始

      同时您也可以保存资源配置记录,单击资源信息记录后的保存当前配置为新纪录。在保存资源记录对话框中,输入资源记录名称后,单击确定

      对已有的资源信息记录,您可以执行以下操作。

      操作

      描述

      查看版本信息

      单击某个记录操作列下的tesga图标,查看版本信息。

      启用记录

      1. 单击某个记录的操作列下的taga图标。

      2. 提示对话框中,单击确定

      删除记录

      1. 单击某个记录操作列下的teag图标。

      2. 提示对话框中,单击确定

  2. 如果实时引擎是Ververica Flink,在实时模式页签下配置Flink SQL任务的运行资源。

    image

    参数

    描述

    集群

    默认为创建任务时选择的集群。您也可以修改。

    引擎版本

    默认为创建任务时选择的引擎版本。您也可以修改。

    Job Manager CPUs

    默认为1,可填任意大于0的数,如1、10.5。

    Job Manager Memory

    默认为4Gi,建议使用Gi/Mi单位;可填数字(单位Byte),或填入包含以下内存单位(Gi/Mi)的数字,例如填写1024000,1024Mi,1.5Gi。

  3. 离线模式页签下配置Flink SQL任务的运行资源。

说明

Dataphin实时计算支持流批一体任务,使用统一的流批计算引擎,在一份代码上可同时配置流+批的任务配置,基于同一份代码生成不同模式下的实例。开启批处理需在任务配置页面开启离线模式并进行资源、调度依赖等相关配置。

image

参数

描述

计算源

若您配置的计算源实时引擎是Ververica Flink,且已勾选批处理可选择其他计算源您可单独为Flink批处理配置计算源。

集群

默认为创建任务时选择的集群。您也可以修改。

引擎版本

默认为创建任务时选择的集群。您也可以修改。

并行度

配置任务运行的并行数,默认为1,可填任意大于0的整数, -1代表自动推断。

Task Manager数量

默认与并行度一致,可填任意大于0的整数。

Job Manager Memory

默认为1Gi,建议使用Gi/Mi/单位;可填数字(单位Byte),或填入包含以下内存单位(Gi/Mi)的数字。

Job Manager Memory

默认为1Gi,建议使用Gi/Mi/单位;可填数字(单位Byte),或填入包含以下内存单位(Gi/Mi)的数字。

  1. (可选)

    配置实时任务的时间参数。

    实时任务的时间参数为stat_date,用于方式实时计算任务的运行时间的偏移,例如,您需要计算当天某个指标聚合值,为了防止时间偏移,则您需要设置state_date大于当天零点,过滤掉偏移的时间点。

    为了规避在任务参数处经常漏掉配置stat_date,您只需要在实时任务配置的属性配置中新增stat_date的kv配置,其中Value是一个基于业务时间的表达式,同时您也可以配置多个时间参数,使用半角分号(;)分割。例如,stat_date=${yyyyMMdd-1},则任务运行过程中的开始执行时间为${yyyyMMdd-1}。

    fagag
  2. 配置Flink SQL任务的依赖关系,实时模式下的依赖关系不实际产生调度依赖。配置依赖可帮助排查调试时快速了解数据的上下游任务。

    任务配置面板的依赖关系区域,配置依赖关系。fa'ga'gfagag

    参数

    描述

    自动解析

    当节点的任务类型为SQL时,您可以单击自动解析,系统会解析代码中的表,并查找到与该表名相同的输出名称。输出名称所在的节点作为当前节点的上游依赖。

    如果代码中引用项目变量或不指定项目,则系统默认解析为生产项目名,以保证生成调度的稳定性。例如,开发项目名称为onedata_dev

    • 如果代码里指定select * from s_order,则调度解析依赖为onedata.s_order

    • 如果代码里指定select * from ${onedata}.s_order,则调度解析依赖为onedata.s_order

    • 如果代码里指定select * from onedata.s_order,则调度解析依赖为onedata.s_order

    • 如果代码里指定select * from onedata_dev.s_order,则调度解析依赖为onedata_dev.s_order

    上游依赖

    通过执行如下操作,添加该节点任务调度时依赖的上游节点:

    1. 单击手动添加上游

    2. 新建上游依赖对话框中,您可以通过以下两种方式搜索依赖节点:

      • 输入所依赖节点的输出名称的关键字进行搜索节点。

      • 输入virtual搜索虚拟节点(每个租户或企业在初始化时都会有一个根节点)。

      说明

      节点的输出名称是全局唯一的,且不区分大小写。

    3. 单击确定新增

    同时您还可以单击操作列下的fagaga图标,删除已添加的依赖节点。

    当前节点

    通过执行如下操作,设置当前节点的输出名称,根据需要您可以设置多个输出名称,供其他节点依赖使用:

    1. 单击手动添加输出

    2. 新增当前节点输出对话框中,填写输出名称。输出名称的命名规则请尽量统一,一般命名规则为生成项目名.表名且不区分大小写,以标识本节点产出的表,同时其他节点更好地选择调度依赖关系。

      例如,开发项目名称为onedata_dev,建议将输出名称设置为onedata.s_order。如果您将输出名称设置为onedata_dev.s_order,则仅限代码select * from onedata_dev.s_order能解析出上游依赖节点。

    3. 单击确定新增

    同时您还可以对当前节点已添加的输出名称执行如下操作:

    • 单击操作列下的fagaga图标,删除已添加的输出名称。

    • 如果该节点已提交或已发布,且被任务所依赖(任务已提交),则单击操作列下的图标,查看下游节点。

  3. (可选)

    配置Flink SQL任务的调度参数。

    1. 任务配置面板的离线模式页签的调度配置区域,配置任务的调度参数。

      gagaga

      参数

      描述

      时间属性

      选择时间属性时间属性包括:

      • 正常调度:按照调度周期的时间配置调度,并正常执行,通常任务默认选中该项。

      • 空跑调度:按照调度周期的时间配置调度,但都是空跑执行,即一调度到该任务便直接返回成功,没有真正的执行任务。

      暂停调度

      暂停调度选择后,即可暂停该任务的调度,会按照下面的调度周期时间配置调度,但是一旦调度到该任务会直接返回失败,不会执行。通常用于某个任务暂时不用执行,但后面还会继续使用的场景。

      调度周期

      调度周期可选择小时分钟

      • 调度,即调度任务每天自动运行一次。新建周期任务时,系统默认的时间周期为每天0点运行一次。您可以根据需要,单击图标,指定运行的时间点。

      • 调度,即调度任务每周的特定几天,在特定时间点自动运行一次。您可以根据需要,单击图标,指定运行的时间点。

        如果您没有指定日期,为保证下游实例正常运行,系统会生成实例后直接设置为运行成功,而不会真正执行任何逻辑,也不会占用资源。

      • 调度,即调度任务在每月的特定几天,在特定时间点自动运行一次。您可以根据需要,单击图标,指定运行的时间点。

        如果在没有被指定的日期时,为保证下游实例正常运行,系统会每天生成实例后直接设置为运行成功,而不会真正执行任何逻辑,也不会占用资源。

      • 小时调度,即每天指定的时间段内,调度任务按间隔时间数的时间间隔运行一次。或选择指定的时间点,调度系统会自动为任务生成实例并运行。您可以根据业务需求选中时间段时间点

        • 如果您选中了时间段,您可以单击开始结束后的图标,指定运行的开始和结束时间。同时您可以单击间隔后的test图标,在下拉列表中选择间隔时间。

        • 如果您选中了时间点,单击下拉列表框,在下拉列表中选择时间点。

        例如,每天00:00~23:59的时间段内,每隔1小时会自动调度一次,因此调度系统会自动为任务生成实例并运行。

      • 分钟调度,即每天指定的时间段内,调度任务按间隔时间数的时间间隔运行一次。 您可以单击开始结束后的图标,指定运行的开始和结束时间。同时您可以单击间隔后的test图标,在下拉列表中选择间隔时间。

      依赖上周期

      根据业务场景选择本周期节点的运行,是否需要依赖上一周期本节点或其他节点的运行结果。

      选择节点类型。系统支持选择自定义当前。适用场景说明如下:

      • 本周期节点是否运行取决于上一周期本节点是否正常产出数据,则需要选择当前。只有上一周期本节点运行成功,才会启动运行本节点。

      • 代码任务没有用到某个节点的产出表,但业务上需要依赖该节点的上一周期是否正常产出数据,则需要选择依赖自定义节点。

      优先级

      优先级定义了同一时间同一批待调度任务的优先级。优先级包括:

      • 最低优先级

      • 低优先级

      • 中等优先级

      • 高优先级

      • 最高优先级

      时间参数

      您可以对代码中所用参数的具体赋值。单击节点参数配置说明,查看Dataphin调度系统的配置规则及支持配置的时间参数。

    2. 单击确定

  4. (可选)

    配置该Flink SQL任务启动所需的相关参数。

    gagaga

步骤四:提交Flink SQL任务

按照下图操作指引,提交Flink SQL任务。

gagag
提交成功后,您可以按照下图操作指引,查看Flink SQL任务的版本信息。 gagaga同时在 版本信息面板您也可以进行不同版本的代码对比及查看代码:
  • 按照下图操作指引,进行版本对比。版本对比包括代码对比、资源配置对比和模板变量参数对比。gagaga

  • 按照下图操作指引,查看代码。gaga

如果项目的模式为Dev-Prod,则您需要发布Flink SQL任务至生产环境。具体操作,请参见发布任务

后续步骤

在运维中心查看并运维Flink SQL任务,保证任务的正常运行。具体操作,请参见实时任务实时实例

阿里云首页 智能数据建设与治理 Dataphin 相关技术圈