Flink DataStream计算任务是基于Java环境研发的计算任务,用于进行数据的实时处理。本文为您介绍如何基于JAR资源文件,构建Flink DataStream实时计算任务。
前提条件
使用限制
Dataphin支持超级管理员、项目管理员和开发者创建Flink DataStream计算任务。
步骤一:新建Flink DataStream任务
- 登录Dataphin控制台。
- 在Dataphin控制台页面,选择工作区地域后,单击进入Dataphin>>。
- 在Dataphin首页,单击顶部菜单栏研发。
- 按照下图指引,进入新建 Flink DataStream对话框。
- 在新建Flink_DataStream对话框,配置参数后,单击确定。
参数 描述 名称 名称的命名规则如下: - 只能包含小写英文字母、数字、下划线(_)。
- 名称的长度范围为3~62个字符。
- 项目内的名称不支持重复。
- 名称仅支持以英文字母开头。
选择目录 实时计算任务所属的目录。 选择资源 该实时任务依赖的资源包。 类名 使用资源的完整类名。 资源队列 该项目所绑定的实时计算源中的资源队列。 引擎版本 当前资源队列所支持的版本。
步骤二:开发Flink DataStream任务的代码
- 在Flink DataStream任务代码页面,编写任务的代码。您可以单击页面右上方的格式化,系统自动调整SQL代码格式。
- 单击页面右上方的预编译,校验代码任务的语法及权限问题。
- 单击页面右上方的调试,代码任务采样数据并进行本地调试,保障代码任务的正确性。
- 在调试配置对话框的选择采样模式页签中,选择调试的模式后,单击下一步。
- 在采样调试数据页签中,为元数据进行采样调试。您可以通过自动抽样和上传数据的方式,为元数据进行采样调试。适用场景说明如下:
- 自动抽样:自动抽样到的数据是随机的,所以适用于对采集到的数据没有限制的场景。选择自动抽样后,需要配置抽样条数。
注意 如果元数据表中没有数据,则自动抽样将采集不到数据。
- 上传本地数据:自动抽样时元数据采集不到或数据抽样的逻辑比较严格,例如从100万条数据中抽取其中1条数据,这样采集效率就很低,可以选择手动上传本地数据。
上传本地数据前需要先下载样例,根据下载的样例编辑需要上传的数据,单击上传后,数据自动填充至元数据采样区域。
- 手动输入:适用于采集的数据比较少,或者需要修改已采集到的数据的场景。
- 自动抽样:自动抽样到的数据是随机的,所以适用于对采集到的数据没有限制的场景。选择自动抽样后,需要配置抽样条数。
- 完成所有数据表的元数据采样后,单击页面下方的确定。
- 在Result页面,查看调试数据、中间结果和调试结果。
- 在调试配置对话框的选择采样模式页签中,选择调试的模式后,单击下一步。
步骤三:配置Flink DataStream任务的资源
- 在Flink DataStream任务代码开发页面,单击页面上方的任务配置。
- 在任务配置页面,确认在实时模式页签。在实时模式页签,为您展示任务运行所使用的资源队列、引擎版本、资源配置类型及自动调优。
资源队列和引擎版本默认为创建Flink DataStream任务时选择的配置。同时,您也可以修改资源队列和引擎版本。
- 选中自定义配置为资源配置类型,并单击去配置。
- 在资源配置页面,为您展示一张拓扑图,图中每个方框代表了一个计算任务,都可以进行独立配置。每个Group代表着Group内部的节点可以存放在一台机器进行计算,可以有效避免数据的网络传播,提升性能。图中当前的资源配置就是默认为您展示系统推荐的资源配置。
- 单击需要配置资源的Group右上角的①后,在自定义配置Group执行参数对话框配置参数后,单击确定。
参数 描述 core 通常,core配置成0.25。即一个CPU可以支持4个线程同时运算,因此core的取值不能超过1,0.25即可,0.125是最低下限了,同时支持8个线程。 heap_memory 单位为MB,heap_memory是堆内存,供Java应用程序使用的内存。 heap_memory及其内部各组成的大小可以通过JVM的一系列命令行参数来控制,在一般的blink程序中,都会需要一定的heap_memory开销,,例如申请一定的heap_memory作为程序的缓存等,因此您可以按程序的规模来设置其大小。
parallel 表示同时并发的线程数量,用户可以选择合适的数量来运行自己的任务,并不是越大越好,越大代表你资源申请的越多,反而对性能有抑制。 通常,一个简单计算节点每秒可以处理2000~4000条之间的数据。注意 如果源头是tt,tt的queue大小决定了parallel的上限,不能超过这个数字,否则程序将报错。direct_memory 单位为MB,direct_memory并不是虚拟机运行时数据区的一部分,也不是Java虚拟机规范中定义的内存区域。但是这部分内存也被频繁的使用,而且也可能导致OutOfMemoryError异常出现。如果您的程序有使用igraph或者swift,可以适当配置其大小,如16-32MB。 在Java NIO(New Input/Output)类,引入了一种基于通道(Channel)与缓冲区(Buffer)的I/O方式,direct_memory可以使用Native函数库直接分配堆外内存,然后同一个存储在Java堆中的DirectByteBuffer对象作为这块内存的引用进行操作。这样能在一场场景中显著提高性能,因为避免了在Java堆和Native堆中来回复制数据。
native_memory 单位为MB,native_memory没有相应的参数来控制大小,其大小依赖于操作系统进程的最大值(对于32位系统就是3~4G,各种系统的实现并不一样),以及生成的Java字节码大小、创建的线程数量、维持Java对象的状态信息大小(用于GC)以及一些第三方的包。 native memory存放下面4种信息:- 管理Java heap的状态数据(用于GC)。
- JNI调用,也就是Native Stack。
- JIT(即使编译器)编译时使用native memory,并且JIT的输入(Java字节码)和输出(可执行代码)也都是保存在native memory。
- NIO direct buffer。
- 按照下图指引,进入自定义配置Operator执行参数对话框并配置参数。完成参数配置后单击确定。
自定义配置Operator执行参数对话框中core、heap_memory、parallel、direct_memory和native_memory参数解释请参见上一步骤,下表仅对state_size、chain_strategy参数进行解释。
参数 描述 state_size 保持默认0即可。 chain_strategy chain_strategy用于定义多个节点的链接策略,Dataphin支持的链接策略包括: - Always:默认参数,即和其他节点均部署在一台机器上,没有特殊要求。
- Never:节点不会和其他节点放在一台机器上,即需要独立部署。
- Head:可以接受和其他节点放在一台机器上,但是只能作为Group的头节点。
注意 目前,Head和Never极少出现,默认Always即可。 - 配置完成后,单击页面右上方的保存。注意 如果您想继续使用系统推荐的资源配置,则单击页面右上方的重置为系统初始。
- 单击需要配置资源的Group右上角的①后,在自定义配置Group执行参数对话框配置参数后,单击确定。
- 可选:保存资源配置记录。注意 仅实时模式支持保存资源配置记录。
如果资源配置类型为自定义配置,则单击资源信息记录后的保存当前配置为新纪录。在保存资源记录对话框中,输入资源记录名称后,单击确定。
对已有的资源信息记录,您可以执行以下操作。操作 描述 查看版本信息 单击某个记录操作列下的 图标,查看版本信息。
启用记录 - 单击某个记录的操作列下的
图标。
- 在提示对话框中,单击确定。
删除记录 - 单击某个记录操作列下的
图标。
- 在提示对话框中,单击确定
- 单击某个记录的操作列下的
- 可选:打开自动调优开关,配置最大CU数和期望最大内存,开启自动调优。注意 仅实时模式支持自动调优功能。
步骤四:配置实时任务的时间参数

实时任务的时间参数为stat_date,用于方式实时计算任务的运行时间的偏移,例如,您需要计算当天某个指标聚合值,为了防止时间偏移,则您需要设置state_date大于当天零点,过滤掉偏移的时间点。
为了规避在任务参数处经常漏掉配置stat_date,您只需要在实时任务配置的属性配置中新增stat_date的kv配置,其中Value是一个基于业务时间的表达式,同时您也可以配置多个时间参数,使用半角分号(;)分割。例如,stat_date=${yyyyMMdd-1},则任务运行过程中的开始执行时间为${yyyyMMdd-1}。

步骤五:配置依赖关系
- 在Flink DataStream任务代码开发页面,单击页面上方的任务配置。
- 在任务配置面板的依赖关系区域,配置依赖关系。
参数 描述 上游依赖 您可以通过自动解析和手动添加两种方式,为Flink DataStream任务节点添加上游依赖的节点: - 单击自动解析,Dataphin会解析Flink DataStream任务代码中的表,并查找到与该表名相同的输出名称。输出名称所在的节点作为当前节点的上游依赖。
如果代码中引用项目变量或不指定项目,则系统默认解析为生产项目名,以保证生成调度的稳定性。例如,开发项目名称为
onedata_dev
:- 如果代码里指定
select * from s_order
,则调度解析依赖为onedata.s_order
。 - 如果代码里指定
select * from ${onedata}.s_order
,则调度解析依赖为onedata.s_order
。 - 如果代码里指定
select * from onedata.s_order
,则调度解析依赖为onedata.s_order
。 - 如果代码里指定
select * from onedata_dev.s_order
,则调度解析依赖为onedata_dev.s_order
。
- 如果代码里指定
- 如果需要添加其他节点作为当前标签的上游节点,则需要手动添加上游依赖的物理节点。
注意 Dataphin不支持手动添加逻辑表节点。
单击新增上游依赖,在新建上游依赖对话框中,输入所依赖节点的输出名称的关键字进行搜索节点,搜索到后单击确定新增。
当前节点 通过执行如下操作,设置当前节点的输出名称,根据需要您可以设置多个输出名称,供其他节点依赖使用: - 单击手动添加输出。
- 在新增当前节点输出对话框中,填写输出名称。输出名称的命名规则请尽量统一,一般命名规则为
生成项目名.表名
且不区分大小写,以标识本节点产出的表,同时其他节点更好地选择调度依赖关系。例如,开发项目名称为
onedata_dev
,建议将输出名称设置为onedata.s_order
。如果您将输出名称设置为onedata_dev.s_order
,则仅限代码select * from onedata_dev.s_order
能解析出上游依赖节点。 - 单击确定新增。
- 单击操作列下的
图标,删除已添加的输出名称。
- 如果该节点已提交或发布。且被任务所依赖(任务已提交),则单击操作列下的
图标,查看下游节点。
- 单击自动解析,Dataphin会解析Flink DataStream任务代码中的表,并查找到与该表名相同的输出名称。输出名称所在的节点作为当前节点的上游依赖。
步骤六:配置任务参数
Flink DataStream任务代码中的通用参数,您可以通过任务参数进行批量配置。
- 在Flink DataStream任务代码开发页面,单击页面上方的任务配置。
- 在任务配置面板,任务参数配置区域,配置参数。
步骤七:提交或发布Flink DataStream任务
- 按照下图指引,提交Flink DataStream任务至生产环境。
- 如果项目空间的开发模式为Dev-Prod,则需要发布Flink DataStream任务至生产环境。如何发布Flink DataStream任务至生产环境,请参见发布任务。