DataWorks新版数据开发中的Flink SQL Streaming节点支持使用标准SQL语句定义实时任务处理逻辑。Flink SQL Streaming具有易用性、丰富的SQL支持、强大的状态管理及容错能力,兼容事件时间和处理时间,并可灵活扩展。该节点易于与Kafka、HDFS等系统集成,提供详尽的日志和性能监控工具。您只需在DataWorks项目中添加Flink SQL Streaming任务并编写SQL语句即可开始实时数据处理。本文将介绍如何在DataWorks开发Flink SQL Streaming节点任务,并通过DataWorks完成Flink实时数据处理。
前提条件
已在管理中心绑定实时计算Flink版计算资源,详情请参见绑定计算资源。
已创建Flink SQL Streaming节点,详情请参见创建调度工作流的节点。
已为DataWorks调用实时计算Flink版OpenAPI使用的RAM用户或RAM角色新增授权以下OpenAPI权限。该授权用于将节点任务提交并部署到Flink集群。
{ "Version": "1", "Statement": [ { "Effect": "Allow", "Action": ["stream:CreateDeployment", "stream:UpdateDeployment", "stream:GetDeployment", "stream:DeleteDeployment"], "Resource": ["*"] } ] }
使用限制
该节点不支持在工作流中使用,仅支持作为独立节点进行开发和运行。
仅支持使用Serverless资源组,不支持旧版独享调度资源组。
步骤一:开发Flink SQL Streaming节点
在Flink SQL Streaming节点编辑页面,执行如下开发操作,完成节点任务开发。
开发SQL代码
在SQL编辑区域开发任务代码,您可在代码中使用${变量名}的方式定义变量,并在节点编辑页面右侧实时配置的脚本参数中为该变量赋值。实现调度场景下代码的动态传参,调度参数使用详情,示例如下。
--创建源表datagen_source。
CREATE TEMPORARY TABLE datagen_source(
name VARCHAR
) WITH (
'connector' = 'datagen'
);
--创建结果表blackhole_sink。
CREATE TEMPORARY TABLE blackhole_sink(
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
--将源表数据插入到结果表。
INSERT INTO blackhole_sink
SELECT
name
FROM datagen_source WHERE LENGTH(name) > ${name_length};该示例参数name_length对应的参数值为5,通过设置该参数可以实现对人名长度小于5的数据进行过滤。
步骤二:配置Flink SQL Streaming节点
您可根据业务情况,参照下面的参数描述信息配置Flink SQL Streaming节点任务。
配置Flink资源
您可在编辑页面右侧实时配置框的Flink 资源信息中根据资源模式配置如下参数信息,详情请参见配置作业资源。
参数 | 参数描述 |
Flink 集群 | 在管理中心绑定的全托管Flink计算资源名称。 |
Flink 引擎版本 | 您可根据实际情况选择引擎版本。 |
资源组 | 选择与Flink网络连通的Serverless资源组。 |
资源模式支持以下两种模式,详情请参见配置作业资源。
请根据您选择的资源模式,参考下文配置相关参数。通过深入理解Flink架构,您可以更有效地进行参数配置,Flink架构详情请参见Flink Architecture | Apache Flink。 | |
基础模式 | |
Job Manager CPU | 根据Flink的最佳实践,JobManager至少需要0.5核CPU和2GiB内存来确保稳定运行,建议配置为1核CPU和4 GiB内存,最大不超过16核CPU。具体配置应根据集群规模和作业复杂度调整。 |
Job Manager Memory | JobManager的内存配置影响其处理调度和管理任务的能力,推荐配置范围是2 GiB到64 GiB,以确保稳定高效的运行。具体大小应根据集群规模和作业需求调整。 |
Task Manager CPU | TaskManager的CPU资源配置影响其任务处理能力。根据Flink的最佳实践,建议配置至少0.5核CPU和2 GiB内存,推荐1核CPU和4 GiB内存,最大不超过16核CPU。具体配置应依据实际需求调整。 |
Task Manager Memory | TaskManager的内存配置决定了其处理任务的数据量和性能。为了确保任务稳定执行和高效处理,内存大小至少应为2 GiB,最大可设置为64 GiB。 |
并发度 | 决定了Flink作业中任务的并行执行数量,较高的并发度可以提高处理速度和资源利用率,您需要根据集群资源和作业特性进行合理设置。 |
每个 TaskManager Slot 数 | 每个TaskManager的Slot数决定了它可以并行执行的任务数量,您可通过调整Slot配置优化资源利用和作业的并行处理能力。 |
专家模式 | |
Job Manager CPU | 根据Flink的最佳实践,JobManager至少需要0.25核CPU和1 GiB内存来确保稳定运行,最大不超过16核CPU。具体配置应根据集群规模和作业复杂度调整。 |
Job Manager Memory | JobManager的内存配置影响其处理调度和管理任务的能力,推荐配置范围是1 GiB到64 GiB,以确保稳定高效的运行。具体大小应根据集群规模和作业需求调整。 |
每个 TaskManager Slot 数 | 每个TaskManager的Slot数决定了它可以并行执行的任务数量,您可通过调整Slot配置优化资源利用和作业的并行处理能力。 |
多 SSG 模式 | 默认情况下,所有算子都放在一个Slot共享组内,因此您无法为每个算子单独修改资源配置。如果您需要对单独的算子设置资源,需要开启多 SSG 模式后让每个算子有自己独立的Slot,这样就可以直接在对应的Slot上设置算子的资源。 |
(可选)配置脚本参数
您可在右侧导航栏的实时配置框的脚本参数中单击添加参数,并编辑相应的参数名、参数值信息,方便在代码中动态使用。
(可选)配置Flink运行参数
您可在右侧导航栏的实时配置框的Flink 运行参数中配置如下参数信息,详情请参见配置作业部署信息。
参数 | 描述 |
系统检查点间隔 | 配置该参数决定了Flink作业定时执行系统检查点的时间间隔,较短的间隔可以减少故障恢复时间但会增加系统开销。如果不填写,将会关闭系统检查点。 |
两次系统检查点之间的最短时间间隔 | 配置该参数定义了Flink在连续检查点之间必须等待的最小时间,以防止过于频繁的检查点对系统性能造成影响。这确保了当系统检查点的最大并行度为1时,两次检查点之间存在一个最短的时间间隔。 |
State数据过期时间 | 配置该参数决定了Flink作业中状态数据在没有被访问或更新的情况下可以保留的最长时间,默认值为36小时,即作业状态信息将在36小时后自动过期并清除。以优化状态存储和资源使用。 重要 此处默认值根据云上最佳实践的经验值设置,与开源的默认值不同(开源默认值为0,表示状态信息永不过期)。 |
其它配置 | 支持Flink的其它运行参数配置,您可在此配置Flink的其它运行参数,例如: 说明 更多参数配置详情请参见配置作业部署信息。 |
完成任务配置后,单击保存节点任务。
步骤三:(可选)调试Flink SQL Streaming节点
在节点发布到生产环境前,您可使用调试功能基于上传的Mock数据对节点代码进行试运行,提前验证SQL逻辑与上下游数据,无需将任务发布至运维中心即可完成校验。
调试功能采用白名单方式开放,如需使用,请提交工单申请开通。
配置Flink资源信息
在节点编辑页面右侧运行配置面板的Flink 资源信息区域,按下表说明完成参数配置。
参数 | 描述 |
Flink 调试集群 | 用于运行调试任务的Flink会话集群(Session Cluster),必填。下拉列表会展示当前计算资源下已存在的会话集群及其运行状态,仅状态为运行中的集群可以选用。 如果列表中无可用集群,可单击新建集群跳转至实时计算Flink版控制台新建会话集群。 |
Flink 引擎版本 | 所选会话集群对应的Flink引擎版本,由系统根据集群自动展示,无需手动填写。 |
超时时间 | 单次调试任务的最长运行时间,单位为分钟,默认30分钟。超过该时长后调试任务会被自动停止。 |
切换当前节点的计算资源后,已选择的Flink 调试集群与已上传的调试数据将被清空,需要重新选择集群并重新上传数据。
准备调试数据
在运行配置面板的调试数据区域,为代码中引用的源表准备Mock数据。
单击生成模板,系统会解析当前SQL中引用的源表,并在下方列表中生成对应的表名记录。已上传的数据不会被清空。
在表名记录的操作列单击下载模板,下载与该源表结构匹配的CSV模板。
在本地按模板的字段顺序填写调试数据并保存为CSV文件。
在表名记录的操作列单击上传,选择填写好的CSV文件完成上传。上传成功后,状态列会显示为已启用。
(可选)上传成功后,可单击预览在底部面板查看数据内容;如需修改数据,可重新上传CSV文件覆盖。
如临时不希望某张源表的Mock数据参与本次调试,可单击禁用,状态变为已禁用;需要重新启用时单击启用。仅已启用状态的数据会被本次调试使用。
上传调试数据前需先选择Flink 调试集群,否则会提示请先选择计算资源。
调试数据仅支持CSV格式,单文件大小不超过1 MB;CSV文件首行需为字段名,编码建议使用UTF-8。
运行调试
调试数据准备完成后,单击编辑器顶部工具栏的运行按钮(或按下F8),系统会将代码、Mock数据与Flink资源信息一并提交至所选会话集群运行。
如代码中使用了${变量名}形式的参数,请确保已在脚本参数区域为变量赋值,调试时系统会按该取值替换代码中的同名占位符后再提交。
查看调试结果
调试任务运行后,节点底部结果区会提供以下信息,便于您快速定位问题:
代码:本次提交到Flink引擎的SQL代码内容(已完成变量替换)。
日志:调试任务的运行日志与异常信息。
查询结果:调试任务的输出数据。
步骤四:启动Flink SQL Streaming节点
发布Flink SQL Streaming节点。
任务需要发布至运维中心后才可执行,请参考界面引导对需要运行的Flink SQL Streaming节点执行发布操作,详情请参见节点/工作流发布。
说明发布操作会同步将任务发布至Flink vvp空间,您可在Flink vvp运维中心>作业运维中看到通过DataWorks发布的任务。
启动Flink SQL Streaming节点。
任务发布后,您可以单击发布到生产环境下方的去运维,在运维中心的中找到需要启动的任务,单击任务操作列的启动按钮,启动并查看实时计算任务的运行情况。