Dataphin支持创建Flink DataStream代码任务。本文为您介绍如何基于Ververica Flink或开源Flink实时引擎创建Flink DataStream任务。
背景信息
阿里云实时计算Flink版是一套基于Apache Flink构建的⼀站式实时大数据分析平台,提供端到端亚秒级实时数据分析能力,并通过标准SQL降低业务开发门槛,助力企业向实时化、智能化大数据计算升级转型。
实时引擎为Ververica Flink和开源Flink支持的功能略有不同,详情请参见实时引擎适配功能说明。
前提条件
在开始执行操作前,请确认项目已开启实时引擎并已配置Ververica Flink或开源Flink计算源。具体操作,请参见创建项目。

步骤一:新建Flink DataStream任务
- 登录Dataphin控制台。
- 在Dataphin控制台页面,选择工作区地域后,单击进入Dataphin>>。
- 在Dataphin首页,单击顶部菜单栏研发。
- 按照下图指引,进入新建FLINK_DATASTREAM任务对话框。
①区域选择编码研发的项目空间,如果您选择了Dev-Prod模式的项目空间,则需要在区域②选择为Dev。
- 在新建Flink_SQL任务对话框,配置参数后,单击确定。
图 1. 开源Flink 图 2. Ververica Flink 参数 描述 任务名称 名称的命名规则如下: - 只能包含小写英文字母、数字、下划线(_)。
- 名称的长度范围为3~62个字符。
- 项目内的名称不支持重复。
- 名称仅支持以英文字母开头。
集群 选择Flink DataStream任务所在集群。 引擎版本 选择引擎的版本。 存储目录 默认选择为代码管理,同时您也可以在计算任务页面创建目标文件夹后,选择该目标文件夹为Flink DataStream任务的目录。 选择资源 该Flink DataStream依赖的资源包。 类名 使用资源的完整类名。 描述 填写对Flink DataStream任务的简单描述。
步骤二:开发Flink DataStream任务的代码
- 在Flink DataStream任务代码页面,编写任务的代码。您可以单击页面右上方的格式化,系统自动调整SQL代码格式。
- 单击页面右上方的预编译,校验代码任务的语法及权限问题。
步骤三:配置Flink DataStream任务
- 如果实时引擎是Ververica Flink,则在实时模式页签下配置Flink SQL任务的运行资源。
集群和引擎版本默认为创建Flink DataStream任务时选择的配置,同时您也可以修改资源队列和引擎版本。Job Manager CPUs和Job Manager CPUs默认为1,您也可以修改。
- 如果实时引擎是开源Flink,在实时模式页签下配置Flink SQL任务的运行资源。
参数 描述 引擎版本 默认为创建任务时选择的集群。您也可以修改。 并行度 配置任务运行的并行数。 Task Manager数量 默认与并行度一致,可填任意大于0的整数。 Job Manager Memory 默认为1Gi,建议使用Gi/Mi/单位;可填数字(单位Byte),或填入包含以下内存单位(Gi/Mi)的数字。 Job Manager Memory 默认为1Gi,建议使用Gi/Mi/单位;可填数字(单位Byte),或填入包含以下内存单位(Gi/Mi)的数字。 - 可选:配置实时任务的时间参数。实时任务的时间参数为stat_date,用于方式实时计算任务的运行时间的偏移,例如,您需要计算当天某个指标聚合值,为了防止时间偏移,则您需要设置state_date大于当天零点,过滤掉偏移的时间点。
为了规避在任务参数处经常漏掉配置stat_date,您只需要在实时任务配置的属性配置中新增stat_date的kv配置,其中Value是一个基于业务时间的表达式,同时您也可以配置多个时间参数,使用半角分号(;)分割。例如,stat_date=${yyyyMMdd-1},则任务运行过程中的开始执行时间为${yyyyMMdd-1}。
- 配置Flink SQL任务的依赖关系。在任务配置面板的依赖关系区域,配置依赖关系。
参数 描述 自动解析 当节点的任务类型为SQL时,您可以单击自动解析,系统会解析代码中的表,并查找到与该表名相同的输出名称。输出名称所在的节点作为当前节点的上游依赖。 如果代码中引用项目变量或不指定项目,则系统默认解析为生产项目名,以保证生成调度的稳定性。例如,开发项目名称为onedata_dev
:- 如果代码里指定
select * from s_order
,则调度解析依赖为onedata.s_order
。 - 如果代码里指定
select * from ${onedata}.s_order
,则调度解析依赖为onedata.s_order
。 - 如果代码里指定
select * from onedata.s_order
,则调度解析依赖为onedata.s_order
。 - 如果代码里指定
select * from onedata_dev.s_order
,则调度解析依赖为onedata_dev.s_order
。
上游依赖 通过执行如下操作,添加该节点任务调度时依赖的上游节点: - 单击手动添加上游。
- 在新建上游依赖对话框中,您可以通过以下两种方式搜索依赖节点:
- 输入所依赖节点的输出名称的关键字进行搜索节点。
- 输入virtual搜索虚拟节点(每个租户或企业在初始化时都会有一个根节点)。
说明 节点的输出名称是全局唯一的,且不区分大小写。 - 单击确定新增。
图标,删除已添加的依赖节点。
当前节点 通过执行如下操作,设置当前节点的输出名称,根据需要您可以设置多个输出名称,供其他节点依赖使用: - 单击手动添加输出。
- 在新增当前节点输出对话框中,填写输出名称。输出名称的命名规则请尽量统一,一般命名规则为
生成项目名.表名
且不区分大小写,以标识本节点产出的表,同时其他节点更好地选择调度依赖关系。例如,开发项目名称为
onedata_dev
,建议将输出名称设置为onedata.s_order
。如果您将输出名称设置为onedata_dev.s_order
,则仅限代码select * from onedata_dev.s_order
能解析出上游依赖节点。 - 单击确定新增。
- 单击操作列下的
图标,删除已添加的输出名称。
- 如果该节点已提交或已发布,且被任务所依赖(任务已提交),则单击操作列下的
图标,查看下游节点。
- 如果代码里指定
- 可选:配置Flink DataStream任务的参数。
步骤四:提交Flink DataStream任务
- 按照下图操作指引,提交Flink DataStream任务。提交成功后,您可以按照下图操作指引,查看Flink DataStream任务的版本信息。
同时在版本信息面板您也可以进行不同版本的代码对比及查看代码:
- 按照下图操作指引,进行版本对比。版本对比包括代码对比、资源配置对比和模版变量参数对比。
- 按照下图操作指引,查看代码。
- 按照下图操作指引,进行版本对比。版本对比包括代码对比、资源配置对比和模版变量参数对比。
- 如果项目的模式为Dev-Prod,则您需要发布Flink DataStream任务至生产环境。具体操作,请参见发布任务。