Flink DataStream计算任务是基于Java环境研发的计算任务,用于进行数据的实时处理。本文为您介绍如何基于JAR资源文件,构建Flink DataStream实时计算任务。

前提条件

在您开始执行操作前,请确认已上传已经完成开发的Datstream作业的JAR包至Dataphin平台,详情请参见新建资源

使用限制

Dataphin支持超级管理员、项目管理员和开发者创建Flink DataStream计算任务。

步骤一:新建Flink DataStream任务

  1. 登录Dataphin控制台
  2. 在Dataphin控制台页面,选择工作区地域后,单击进入Dataphin>>
  3. 在Dataphin首页,单击顶部菜单栏研发
  4. 按照下图指引,进入新建 Flink DataStream对话框。
    gagag
  5. 新建Flink_DataStream对话框,配置参数后,单击确定
    tset
    参数 描述
    名称 名称的命名规则如下:
    • 只能包含小写英文字母、数字、下划线(_)。
    • 名称的长度范围为3~62个字符。
    • 项目内的名称不支持重复。
    • 名称仅支持以英文字母开头。
    选择目录 实时计算任务所属的目录。
    选择资源 该实时任务依赖的资源包。
    类名 使用资源的完整类名。
    资源队列 该项目所绑定的实时计算源中的资源队列。
    引擎版本 当前资源队列所支持的版本。

步骤二:开发Flink DataStream任务的代码

  1. 在Flink DataStream任务代码页面,编写任务的代码。
    您可以单击页面右上方的格式化,系统自动调整SQL代码格式。
  2. 单击页面右上方的预编译,校验代码任务的语法及权限问题。
  3. 单击页面右上方的调试,代码任务采样数据并进行本地调试,保障代码任务的正确性。
    1. 调试配置对话框的选择采样模式页签中,选择调试的模式后,单击下一步
      fagag
    2. 采样调试数据页签中,为元数据进行采样调试。
      您可以通过自动抽样和上传数据的方式,为元数据进行采样调试。适用场景说明如下:
      • 自动抽样:自动抽样到的数据是随机的,所以适用于对采集到的数据没有限制的场景。选择自动抽样后,需要配置抽样条数
        注意 如果元数据表中没有数据,则自动抽样将采集不到数据。
      • 上传本地数据:自动抽样时元数据采集不到或数据抽样的逻辑比较严格,例如从100万条数据中抽取其中1条数据,这样采集效率就很低,可以选择手动上传本地数据。

        上传本地数据前需要先下载样例,根据下载的样例编辑需要上传的数据,单击上传后,数据自动填充至元数据采样区域。

        ggaga
      • 手动输入:适用于采集的数据比较少,或者需要修改已采集到的数据的场景。
    3. 完成所有数据表的元数据采样后,单击页面下方的确定
    4. Result页面,查看调试数据、中间结果和调试结果。
      gagag

步骤三:配置Flink DataStream任务的资源

  1. 在Flink DataStream任务代码开发页面,单击页面上方的任务配置
  2. 任务配置页面,确认在实时模式页签。
    实时模式页签,为您展示任务运行所使用的资源队列、引擎版本、资源配置类型及自动调优。gagag资源队列引擎版本默认为创建Flink DataStream任务时选择的配置。同时,您也可以修改资源队列和引擎版本。
  3. 选中自定义配置资源配置类型,并单击去配置
  4. 在资源配置页面,为您展示一张拓扑图,图中每个方框代表了一个计算任务,都可以进行独立配置。每个Group代表着Group内部的节点可以存放在一台机器进行计算,可以有效避免数据的网络传播,提升性能。图中当前的资源配置就是默认为您展示系统推荐的资源配置。
    gagag
    1. 单击需要配置资源的Group右上角的①后,在自定义配置Group执行参数对话框配置参数后,单击确定
      fdagag
      参数 描述
      core 通常,core配置成0.25。即一个CPU可以支持4个线程同时运算,因此core的取值不能超过1,0.25即可,0.125是最低下限了,同时支持8个线程。
      heap_memory 单位为MB,heap_memory是堆内存,供Java应用程序使用的内存。

      heap_memory及其内部各组成的大小可以通过JVM的一系列命令行参数来控制,在一般的blink程序中,都会需要一定的heap_memory开销,,例如申请一定的heap_memory作为程序的缓存等,因此您可以按程序的规模来设置其大小。

      parallel 表示同时并发的线程数量,用户可以选择合适的数量来运行自己的任务,并不是越大越好,越大代表你资源申请的越多,反而对性能有抑制。
      通常,一个简单计算节点每秒可以处理2000~4000条之间的数据。
      注意 如果源头是tt,tt的queue大小决定了parallel的上限,不能超过这个数字,否则程序将报错。
      direct_memory 单位为MB,direct_memory并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域。但是这部分内存也被频繁的使用,而且也可能导致OutOfMemoryError异常出现。如果您的程序有使用igraph或者swift,可以适当配置其大小,如16-32MB。

      在Java NIO(New Input/Output)类,引入了一种基于通道(Channel)与缓冲区(Buffer)的I/O方式,direct_memory可以使用Native函数库直接分配堆外内存,然后同一个存储在Java堆中的DirectByteBuffer对象作为这块内存的引用进行操作。这样能在一场场景中显著提高性能,因为避免了在Java堆和Native堆中来回复制数据。

      native_memory 单位为MB,native_memory没有相应的参数来控制大小,其大小依赖于操作系统进程的最大值(对于32位系统就是3~4G,各种系统的实现并不一样),以及生成的Java字节码大小、创建的线程数量、维持Java对象的状态信息大小(用于GC)以及一些第三方的包。
      native memory存放下面4种信息:
      • 管理Java heap的状态数据(用于GC)。
      • JNI调用,也就是Native Stack。
      • JIT(即使编译器)编译时使用native memory,并且JIT的输入(Java字节码)和输出(可执行代码)也都是保存在native memory。
      • NIO direct buffer。
    2. 按照下图指引,进入自定义配置Operator执行参数对话框并配置参数。完成参数配置后单击确定
      gagaga自定义配置Operator执行参数对话框中core、heap_memory、parallel、direct_memory和native_memory参数解释请参见上一步骤,下表仅对state_size、chain_strategy参数进行解释。
      参数 描述
      state_size 保持默认0即可。
      chain_strategy chain_strategy用于定义多个节点的链接策略,Dataphin支持的链接策略包括:
      • Always:默认参数,即和其他节点均部署在一台机器上,没有特殊要求。
      • Never:节点不会和其他节点放在一台机器上,即需要独立部署。
      • Head:可以接受和其他节点放在一台机器上,但是只能作为Group的头节点。
      注意 目前,Head和Never极少出现,默认Always即可。
    3. 配置完成后,单击页面右上方的保存
      注意 如果您想继续使用系统推荐的资源配置,则单击页面右上方的重置为系统初始
  5. 可选:保存资源配置记录。
    注意 仅实时模式支持保存资源配置记录。

    如果资源配置类型自定义配置,则单击资源信息记录后的保存当前配置为新纪录。在保存资源记录对话框中,输入资源记录名称后,单击确定

    对已有的资源信息记录,您可以执行以下操作。
    操作 描述
    查看版本信息 单击某个记录操作列下的tesga图标,查看版本信息。
    启用记录
    1. 单击某个记录的操作列下的taga图标。
    2. 提示对话框中,单击确定
    删除记录
    1. 单击某个记录操作列下的teag图标。
    2. 提示对话框中,单击确定
  6. 可选:打开自动调优开关,配置最大CU数期望最大内存,开启自动调优。
    注意 仅实时模式支持自动调优功能。

步骤四:配置实时任务的时间参数

faga

实时任务的时间参数为stat_date,用于方式实时计算任务的运行时间的偏移,例如,您需要计算当天某个指标聚合值,为了防止时间偏移,则您需要设置state_date大于当天零点,过滤掉偏移的时间点。

为了规避在任务参数处经常漏掉配置stat_date,您只需要在实时任务配置的属性配置中新增stat_date的kv配置,其中Value是一个基于业务时间的表达式,同时您也可以配置多个时间参数,使用半角分号(;)分割。例如,stat_date=${yyyyMMdd-1},则任务运行过程中的开始执行时间为${yyyyMMdd-1}。

fagag

步骤五:配置依赖关系

  1. 在Flink DataStream任务代码开发页面,单击页面上方的任务配置
  2. 任务配置面板的依赖关系区域,配置依赖关系。
    fa'ga'gfagag
    参数 描述
    上游依赖 您可以通过自动解析和手动添加两种方式,为Flink DataStream任务节点添加上游依赖的节点:
    • 单击自动解析,Dataphin会解析Flink DataStream任务代码中的表,并查找到与该表名相同的输出名称。输出名称所在的节点作为当前节点的上游依赖。
      如果代码中引用项目变量或不指定项目,则系统默认解析为生产项目名,以保证生成调度的稳定性。例如,开发项目名称为onedata_dev
      • 如果代码里指定select * from s_order,则调度解析依赖为onedata.s_order
      • 如果代码里指定select * from ${onedata}.s_order,则调度解析依赖为onedata.s_order
      • 如果代码里指定select * from onedata.s_order,则调度解析依赖为onedata.s_order
      • 如果代码里指定select * from onedata_dev.s_order,则调度解析依赖为onedata_dev.s_order
    • 如果需要添加其他节点作为当前标签的上游节点,则需要手动添加上游依赖的物理节点。
      注意 Dataphin不支持手动添加逻辑表节点。

      单击新增上游依赖,在新建上游依赖对话框中,输入所依赖节点的输出名称的关键字进行搜索节点,搜索到后单击确定新增

    当前节点 通过执行如下操作,设置当前节点的输出名称,根据需要您可以设置多个输出名称,供其他节点依赖使用:
    1. 单击手动添加输出
    2. 新增当前节点输出对话框中,填写输出名称。输出名称的命名规则请尽量统一,一般命名规则为生成项目名.表名且不区分大小写,以标识本节点产出的表,同时其他节点更好地选择调度依赖关系。

      例如,开发项目名称为onedata_dev,建议将输出名称设置为onedata.s_order。如果您将输出名称设置为onedata_dev.s_order,则仅限代码select * from onedata_dev.s_order能解析出上游依赖节点。

    3. 单击确定新增
    同时您还可以对当前节点已添加的输出名称执行如下操作:
    • 单击操作列下的fagaga图标,删除已添加的输出名称。
    • 如果该节点已提交或发布。且被任务所依赖(任务已提交),则单击操作列下的图标,查看下游节点。

步骤六:配置任务参数

Flink DataStream任务代码中的通用参数,您可以通过任务参数进行批量配置。

  1. 在Flink DataStream任务代码开发页面,单击页面上方的任务配置
  2. 任务配置面板,任务参数配置区域,配置参数。

步骤七:提交或发布Flink DataStream任务

  1. 按照下图指引,提交Flink DataStream任务至生产环境。
    gagaga
  2. 如果项目空间的开发模式为Dev-Prod,则需要发布Flink DataStream任务至生产环境。如何发布Flink DataStream任务至生产环境,请参见发布任务

后续步骤

在运维中心查看并运维Flink DataStream任务,保证任务的正常运行。具体操作,请参见实时任务实时实例