Dataphin支持创建Flink DataStream代码任务。本文为您介绍如何基于Ververica Flink或开源Flink实时引擎创建Flink DataStream任务。

背景信息

阿里云实时计算Flink版是一套基于Apache Flink构建的⼀站式实时大数据分析平台,提供端到端亚秒级实时数据分析能力,并通过标准SQL降低业务开发门槛,助力企业向实时化、智能化大数据计算升级转型。

实时引擎为Ververica Flink和开源Flink支持的功能略有不同,详情请参见实时引擎适配功能说明

前提条件

在开始执行操作前,请确认项目已开启实时引擎并已配置Ververica Flink或开源Flink计算源。具体操作,请参见创建项目gagaga

步骤一:新建Flink DataStream任务

  1. 登录Dataphin控制台
  2. 在Dataphin控制台页面,选择工作区地域后,单击进入Dataphin>>
  3. 在Dataphin首页,单击顶部菜单栏研发
  4. 按照下图指引,进入新建FLINK_DATASTREAM任务对话框。
    daga①区域选择编码研发的项目空间,如果您选择了Dev-Prod模式的项目空间,则需要在区域②选择为Dev
  5. 新建Flink_SQL任务对话框,配置参数后,单击确定
    图 1. 开源Flink
    开源Flink
    图 2. Ververica Flink
    Ververica Flink
    参数 描述
    任务名称 名称的命名规则如下:
    • 只能包含小写英文字母、数字、下划线(_)。
    • 名称的长度范围为3~62个字符。
    • 项目内的名称不支持重复。
    • 名称仅支持以英文字母开头。
    集群 选择Flink DataStream任务所在集群。
    引擎版本 选择引擎的版本。
    存储目录 默认选择为代码管理,同时您也可以在计算任务页面创建目标文件夹后,选择该目标文件夹为Flink DataStream任务的目录。gagaga
    选择资源 该Flink DataStream依赖的资源包。
    类名 使用资源的完整类名。
    描述 填写对Flink DataStream任务的简单描述。

步骤二:开发Flink DataStream任务的代码

  1. 在Flink DataStream任务代码页面,编写任务的代码。
    您可以单击页面右上方的格式化,系统自动调整SQL代码格式。
  2. 单击页面右上方的预编译,校验代码任务的语法及权限问题。

步骤三:配置Flink DataStream任务

  1. 如果实时引擎是Ververica Flink,则在实时模式页签下配置Flink SQL任务的运行资源。
    fagaga集群引擎版本默认为创建Flink DataStream任务时选择的配置,同时您也可以修改资源队列和引擎版本。Job Manager CPUs和Job Manager CPUs默认为1,您也可以修改。
  2. 如果实时引擎是开源Flink,在实时模式页签下配置Flink SQL任务的运行资源。
    fagaga
    参数 描述
    引擎版本 默认为创建任务时选择的集群。您也可以修改。
    并行度 配置任务运行的并行数。
    Task Manager数量 默认与并行度一致,可填任意大于0的整数。
    Job Manager Memory 默认为1Gi,建议使用Gi/Mi/单位;可填数字(单位Byte),或填入包含以下内存单位(Gi/Mi)的数字。
    Job Manager Memory 默认为1Gi,建议使用Gi/Mi/单位;可填数字(单位Byte),或填入包含以下内存单位(Gi/Mi)的数字。
  3. 可选:配置实时任务的时间参数。
    实时任务的时间参数为stat_date,用于方式实时计算任务的运行时间的偏移,例如,您需要计算当天某个指标聚合值,为了防止时间偏移,则您需要设置state_date大于当天零点,过滤掉偏移的时间点。

    为了规避在任务参数处经常漏掉配置stat_date,您只需要在实时任务配置的属性配置中新增stat_date的kv配置,其中Value是一个基于业务时间的表达式,同时您也可以配置多个时间参数,使用半角分号(;)分割。例如,stat_date=${yyyyMMdd-1},则任务运行过程中的开始执行时间为${yyyyMMdd-1}。

    fagag
  4. 配置Flink SQL任务的依赖关系。
    任务配置面板的依赖关系区域,配置依赖关系。fa'ga'gfagag
    参数 描述
    自动解析 当节点的任务类型为SQL时,您可以单击自动解析,系统会解析代码中的表,并查找到与该表名相同的输出名称。输出名称所在的节点作为当前节点的上游依赖。
    如果代码中引用项目变量或不指定项目,则系统默认解析为生产项目名,以保证生成调度的稳定性。例如,开发项目名称为onedata_dev
    • 如果代码里指定select * from s_order,则调度解析依赖为onedata.s_order
    • 如果代码里指定select * from ${onedata}.s_order,则调度解析依赖为onedata.s_order
    • 如果代码里指定select * from onedata.s_order,则调度解析依赖为onedata.s_order
    • 如果代码里指定select * from onedata_dev.s_order,则调度解析依赖为onedata_dev.s_order
    上游依赖 通过执行如下操作,添加该节点任务调度时依赖的上游节点:
    1. 单击手动添加上游
    2. 新建上游依赖对话框中,您可以通过以下两种方式搜索依赖节点:
      • 输入所依赖节点的输出名称的关键字进行搜索节点。
      • 输入virtual搜索虚拟节点(每个租户或企业在初始化时都会有一个根节点)。
      说明 节点的输出名称是全局唯一的,且不区分大小写。
    3. 单击确定新增
    同时您还可以单击操作列下的fagaga图标,删除已添加的依赖节点。
    当前节点 通过执行如下操作,设置当前节点的输出名称,根据需要您可以设置多个输出名称,供其他节点依赖使用:
    1. 单击手动添加输出
    2. 新增当前节点输出对话框中,填写输出名称。输出名称的命名规则请尽量统一,一般命名规则为生成项目名.表名且不区分大小写,以标识本节点产出的表,同时其他节点更好地选择调度依赖关系。

      例如,开发项目名称为onedata_dev,建议将输出名称设置为onedata.s_order。如果您将输出名称设置为onedata_dev.s_order,则仅限代码select * from onedata_dev.s_order能解析出上游依赖节点。

    3. 单击确定新增
    同时您还可以对当前节点已添加的输出名称执行如下操作:
    • 单击操作列下的fagaga图标,删除已添加的输出名称。
    • 如果该节点已提交或已发布,且被任务所依赖(任务已提交),则单击操作列下的图标,查看下游节点。
  5. 可选:配置Flink DataStream任务的参数。
    gagaga

步骤四:提交Flink DataStream任务

  1. 按照下图操作指引,提交Flink DataStream任务。
    gagga
    提交成功后,您可以按照下图操作指引,查看Flink DataStream任务的版本信息。gagaga同时在版本信息面板您也可以进行不同版本的代码对比及查看代码:
    • 按照下图操作指引,进行版本对比。版本对比包括代码对比、资源配置对比和模版变量参数对比。gagaga
    • 按照下图操作指引,查看代码。gagaga
  2. 如果项目的模式为Dev-Prod,则您需要发布Flink DataStream任务至生产环境。具体操作,请参见发布任务

后续步骤

在运维中心查看并运维Flink DataStream任务,保证任务的正常运行。具体操作,请参见实时任务实时实例