本文介绍如何使用 Nexmark 基准测试工具评测实时计算 Flink 版的流处理性能。
性能一览
Nexmark 是业界通用的流处理引擎性能基准,包含 19 条标准 Query,覆盖过滤、聚合、JOIN、窗口等典型场景。本文基于 Nexmark 测试工具,在 8 CU 计算资源配置下,以 1 亿条输入数据为基准,对实时计算 Flink 版进行全量 Query 性能评测。测试结果表明:
简单查询(如 q0、q1、q2)的 RPS 可达 400 万~650 万条/秒。
复杂聚合与窗口查询(如 q4、q5、q16)的 RPS 在 15 万~63 万条/秒之间。
整体上,实时计算 Flink 版的 Nexmark 性能是开源 Flink 的 3.24 倍。
测试工具
Nexmark 是一套针对流处理引擎的标准性能基准测试。测试模型如下:
Nexmark 源表:按照指定 TPS 生成测试数据(Person、Auction、Bid 三类事件)。
Transformations:19 条标准 Nexmark Query,覆盖过滤、转换、聚合、JOIN、窗口等典型场景。
Blackhole 结果表:数据写入 Blackhole,排除外部存储的性能干扰,专注评测 Flink 引擎自身的处理能力。
本文采用的 Nexmark 测试工具基于实时计算 Flink 版的 OpenAPI 实现,自动化完成作业创建、部署、运行监控及结果采集的全流程。无需在控制台手动编写 SQL 或创建作业。
测试环境
本次测试的 Flink 作业启用了以下优化配置:
配置项 | 参数值 | 说明 |
table.exec.mini-batch.enabled | true | 开启 Mini-Batch 聚合 |
table.exec.mini-batch.allow-latency | 2s | Mini-Batch 攒批间隔 |
table.optimizer.distinct-agg.split.enabled | true | 开启 Distinct 聚合拆分优化 |
execution.checkpointing.interval | 3min | Checkpoint 间隔 |
前提条件
已安装 Java JDK 1.8.x 或更高版本。
已创建工作空间,详情请参见开通实时计算Flink版。
已获取阿里云账号的 AccessKey ID 和 AccessKey Secret。
测试步骤
步骤一:下载测试工具
下载 Nexmark 测试工具压缩包nexmark-flink.tar.gz并解压。
解压后的目录结构如下:
nexmark-flink/
├── run_nexmark.sh # 测试入口脚本
├── nexmark_env.sh # 环境变量配置文件(需编辑)
├── bin/ # 运行脚本
├── conf/ # Flink 作业配置
├── lib/ # JAR 包(需上传至控制台)
└── queries-vvp/ # Nexmark Query SQL 文件步骤二:上传 Nexmark JAR
登录实时计算控制台。
单击进入目标项目空间,在左侧导航栏。
选择并上传
nexmark-flink-0.2-SNAPSHOT.jar文件。该文件位于测试工具的nexmark-flink/lib目录下。上传完成后,单击目标文件名称复制 OSS 地址。该地址在后续配置参数时使用。文件路径格式因存储类型而异:
OSS Bucket 存储:
oss://<OSS Bucket 名称>/artifacts/namespaces/<项目空间名称>/<文件名>例:
oss://oss-test/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar全托管存储:
oss://flink-fullymanaged-<工作空间ID>/artifacts/namespaces/<项目空间名称>/<文件名>例:oss://flink-fullymanaged-e6a123456789/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar
如需查看工作空间的存储类型,在实时计算管理控制台单击目标工作空间操作列下的详情查看。
步骤三:配置运行参数
编辑 nexmark-flink/nexmark_env.sh 文件,填写以下参数。
参数名 | 说明 | 示例 |
END_POINT | 实时计算 Flink 版的服务接入点。根据地域选择对应的接入点,详情请参见服务接入点。 | ververica.cn-hangzhou.aliyuncs.com |
AK | 阿里云账号的 AccessKey ID。 | - |
SK | 阿里云账号的 AccessKey Secret。 | - |
WORK_SPACE | 项目工作空间的 Workspace ID。 | e6a123456789 |
NAMESPACE | 项目工作空间的 Namespace。 | flink-default |
NEXMARK_JAR | 步骤二中上传的 JAR 文件的 OSS 地址。 | oss://flink-fullymanaged-e6a123456789/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar |
FLINK_VERSION | 目标测试的 Flink 引擎版本号。 | vvr-11.6-jdk11-flink-1.20 |
QUERIES | 指定运行的 Query。多个 Query 以逗号分隔,例如 | all |
运行全部 Query 耗时较长。每条 Query 需要经历作业创建、数据生成、计算执行等阶段。建议先运行单条 Query(例如将 QUERIES 设为 q0),验证环境配置和参数填写无误后,再执行全量测试。
步骤四:运行测试
在
nexmark-flink目录下执行以下命令。./run_nexmark.sh测试工具通过 OpenAPI 自动创建并运行 Nexmark 作业。
运行完成后,输出各 Query 的运行时长(毫秒)。示例如下:
INFO com.github.nexmark.flink.vvp.Nexmark - q0 13078 ============================================================================ ✓ Benchmark execution completed successfully ============================================================================
性能表现
以下为 8 CU 计算资源配置下,开源 Flink(1.20.4)与实时计算 Flink 版(vvr-11.5-jdk11-flink-1.20)的 Nexmark 性能对比。每条 Query 输入 1 亿条数据,RPS = 输入数据量 ÷ 用时。
以下测试数据基于特定硬件环境和引擎版本采集。随着底层硬件升级迭代和引擎版本更新,实际性能表现可能存在差异,测试结果仅供参考。
Query | 开源 Flink on ECS Version:1.20.4 | 实时计算 Flink 版 Version:vvr-11.5-jdk11-flink-1.20 | |||
用时(毫秒) | RPS | 用时(毫秒) | RPS | RPS 相比开源 Flink(倍) | |
q0 | 58848 | 1,699,293 | 23450 | 4,264,392 | 2.51 |
q1 | 57045 | 1,753,002 | 22824 | 4,381,353 | 2.50 |
q2 | 51890 | 1,927,154 | 15224 | 6,568,576 | 3.41 |
q3 | 84986 | 1,176,664 | 21558 | 4,638,649 | 3.94 |
q4 | 553426 | 180,693 | 157117 | 636,468 | 3.52 |
q5 | 365636 | 273,496 | 357547 | 279,684 | 1.02 |
q7 | 1257452 | 79,526 | 333837 | 299,547 | 3.77 |
q8 | 79788 | 1,253,321 | 29939 | 3,340,125 | 2.67 |
q9 | 2324518 | 43,020 | 266563 | 375,146 | 8.72 |
q10 | 189985 | 526,357 | 51202 | 1,953,049 | 3.71 |
q11 | 408384 | 244,868 | 145983 | 685,011 | 2.80 |
q12 | 121554 | 822,680 | 36991 | 2,703,360 | 3.29 |
q14 | 68903 | 1,451,316 | 20012 | 4,997,002 | 3.44 |
q15 | 183709 | 544,339 | 42734 | 2,340,057 | 4.30 |
q16 | 917597 | 108,980 | 337293 | 296,478 | 2.72 |
q17 | 102847 | 972,318 | 27076 | 3,693,308 | 3.80 |
q18 | 574949 | 173,928 | 96335 | 1,038,044 | 5.97 |
q19 | 586287 | 170,565 | 95121 | 1,051,293 | 6.16 |
q20 | 1340638 | 74,591 | 231482 | 431,999 | 5.79 |
q21 | 127089 | 786,850 | 39693 | 2,519,336 | 3.20 |
q22 | 94830 | 1,054,519 | 31228 | 3,202,254 | 3.04 |
总计 | 2383209 | 49,695,131 | 9550361 | 15,317,480 | 3.24 |
开源 Flink 测试流程
以下为开源 Flink on ECS 的 Nexmark 测试步骤,供复现验证。
环境准备
通过 EMR on ECS 创建 Flink 集群,集群配置如下:
EMR 版本:EMR-5.21.0
硬件规格:3 台 ecs.g6a.xlarge(4 vCPU / 16 GiB),1 台 Master + 2 台 Core
开启 Hadoop 和 HDFS 服务
配置好各个节点之间的免密登录。例如,将安全组的私钥 key.pem上传到 master 节点,然后在 master 节点的 ~/.ssh/config 写入如下配置,注意将 IP 和文件路径替换为实际值:
Host 192.168.0.0 HostName 192.168.0.0 User root IdentityFile /path/to/key.pem StrictHostKeyChecking no Host 192.168.0.1 HostName 192.168.0.1 User root IdentityFile /path/to/key.pem StrictHostKeyChecking no Host 192.168.0.2 HostName 192.168.0.2 User root IdentityFile /path/to/key.pem StrictHostKeyChecking no使用 ssh 验证各节点间免密登录是否正常。若报错 "bad permissions",执行
chmod 600 /path/to/key.pem修正权限。
软件准备
下载目标版本的 Flink 包(Apache Flink Downloads)和 Nexmark 测试包(nexmark-flink.tgz),上传至 Master 节点并解压。
tar -zxvf flink-1.20.4-bin-scala_2.12.tgz tar -zxvf nexmark-flink.tgz mv flink-1.20.4 flink mv nexmark-flink nexmark
将
nexmark/lib目录下的 JAR 文件复制到flink/lib,这些 JAR 包含 Nexmark 数据生成器。cp nexmark/lib/* flink/lib/
设置环境变量。编辑
~/.bashrc,加入以下配置后执行source ~/.bashrc使其生效。根据实际环境配置相关路径。
export JAVA_HOME=/etc/alternatives/java_sdk_11 export PATH=$JAVA_HOME/bin:$PATHexport FLINK_HOME=/mnt/disk1/flink export HADOOP_CLASSPATH=$(/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop classpath)
配置并启动 Flink 集群
配置 Flink Workers。本文使用 8 个 TaskManager,采用"Master 节点 2 个 + 每个 Core 节点各 3 个"的部署方式。
编辑
flink/conf/workers,注意将IP替换为实际值。192.168.0.0 192.168.0.0 192.168.0.1 192.168.0.1 192.168.0.1 192.168.0.2 192.168.0.2 192.168.0.2
用
nexmark/conf/config.yaml替换flink/conf/config.yaml,并更新以下配置项:jobmanager.rpc.address:Master 节点 IP,如192.168.0.0state.checkpoints.dir:HDFS 路径,如hdfs:///checkpointstaskmanager.memory.process.size:4G
编辑
nexmark/conf/nexmark.yaml,将nexmark.metric.reporter.host设为 Master 节点 IP。将
flink和nexmark目录及环境变量配置分发到各 Core 节点。注意将IP替换为实际值。
scp -r flink 192.168.0.1:/mnt/disk1/ scp -r flink 192.168.0.2:/mnt/disk1/ scp -r nexmark 192.168.0.1:/mnt/disk1/ scp -r nexmark 192.168.0.2:/mnt/disk1/ scp ~/.bashrc 192.168.0.1:~/ scp ~/.bashrc 192.168.0.2:~/分发完成后,在各 Core 节点执行
source ~/.bashrc使环境变量生效。
在 Master 节点启动 Flink 集群。
flink/bin/start-cluster.sh
初始化 Nexmark 测试环境。该脚本会在各节点上配置 Nexmark 运行所需的 Metric Reporter。
nexmark/bin/setup_cluster.sh
水位设置
Flink 集群启动后,需通过 cgroup 将各 TaskManager 进程的 CPU 水位限制在 75% 以内,避免因资源争抢导致 TaskManager 心跳超时失联。
在所有运行 TaskManager 的节点上(含 Master 节点)执行以下命令:
yum install -y libcgroup libcgroup-tools
cgcreate -t root:root -a root:root -g cpu,memory:mygroup
echo 100000 > /sys/fs/cgroup/cpu/mygroup/cpu.cfs_period_us
echo 300000 > /sys/fs/cgroup/cpu/mygroup/cpu.cfs_quota_us
echo $((12 * 1024 * 1024 * 1024)) > /sys/fs/cgroup/memory/mygroup/memory.limit_in_bytes
jps | grep TaskManagerRunner | awk '{print $1}' | xargs cgclassify -g cpu,memory:mygroup其中 cpu.cfs_quota_us / cpu.cfs_period_us = 300000 / 100000 = 3,表示该 cgroup 最多使用 3 个 CPU 核心(4 vCPU 的 75%)。
实时计算 Flink 版(全托管环境)无需手动控制水位,可全量使用所购买的计算资源。
运行 Nexmark
在 Master 节点执行以下命令,等待运行结束后查看结果:
nexmark/bin/run_query.sh q0,q1,q2,q3,q4,q5,q7,q8,q9,q10,q11,q12,q14,q15,q16,q17,q18,q19,q20,q21,q22