性能白皮书(Nexmark性能测试)

更新时间:
复制为 MD 格式

本文介绍如何使用 Nexmark 基准测试工具评测实时计算 Flink 版的流处理性能。

性能一览

Nexmark 是业界通用的流处理引擎性能基准,包含 19 条标准 Query,覆盖过滤、聚合、JOIN、窗口等典型场景。本文基于 Nexmark 测试工具,在 8 CU 计算资源配置下,以 1 亿条输入数据为基准,对实时计算 Flink 版进行全量 Query 性能评测。测试结果表明:

  • 简单查询(如 q0、q1、q2)的 RPS 可达 400 万~650 万条/秒。

  • 复杂聚合与窗口查询(如 q4、q5、q16)的 RPS 在 15 万~63 万条/秒之间。

整体上,实时计算 Flink 版的 Nexmark 性能是开源 Flink 的 3.24 倍。

测试工具

Nexmark 是一套针对流处理引擎的标准性能基准测试。测试模型如下:

  • Nexmark 源表:按照指定 TPS 生成测试数据(Person、Auction、Bid 三类事件)。

  • Transformations:19 条标准 Nexmark Query,覆盖过滤、转换、聚合、JOIN、窗口等典型场景。

  • Blackhole 结果表:数据写入 Blackhole,排除外部存储的性能干扰,专注评测 Flink 引擎自身的处理能力。

本文采用的 Nexmark 测试工具基于实时计算 Flink 版的 OpenAPI 实现,自动化完成作业创建、部署、运行监控及结果采集的全流程。无需在控制台手动编写 SQL 或创建作业。

测试环境

本次测试的 Flink 作业启用了以下优化配置:

配置项

参数值

说明

table.exec.mini-batch.enabled

true

开启 Mini-Batch 聚合

table.exec.mini-batch.allow-latency

2s

Mini-Batch 攒批间隔

table.optimizer.distinct-agg.split.enabled

true

开启 Distinct 聚合拆分优化

execution.checkpointing.interval

3min

Checkpoint 间隔

前提条件

  • 已安装 Java JDK 1.8.x 或更高版本。

  • 已创建工作空间,详情请参见开通实时计算Flink

  • 已获取阿里云账号的 AccessKey ID 和 AccessKey Secret。

测试步骤

步骤一:下载测试工具

下载 Nexmark 测试工具压缩包nexmark-flink.tar.gz并解压。

解压后的目录结构如下:

nexmark-flink/
├── run_nexmark.sh          # 测试入口脚本
├── nexmark_env.sh          # 环境变量配置文件(需编辑)
├── bin/                    # 运行脚本
├── conf/                   # Flink 作业配置
├── lib/                    # JAR 包(需上传至控制台)
└── queries-vvp/            # Nexmark Query SQL 文件

步骤二:上传 Nexmark JAR

  1. 登录实时计算控制台

  2. 单击进入目标项目空间,在左侧导航栏文件管理 > 上传资源

  3. 选择并上传 nexmark-flink-0.2-SNAPSHOT.jar 文件。该文件位于测试工具的 nexmark-flink/lib 目录下。

  4. 上传完成后,单击目标文件名称复制 OSS 地址。该地址在后续配置参数时使用。文件路径格式因存储类型而异:

    • OSS Bucket 存储oss://<OSS Bucket 名称>/artifacts/namespaces/<项目空间名称>/<文件名>

      例:oss://oss-test/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar
    • 全托管存储oss://flink-fullymanaged-<工作空间ID>/artifacts/namespaces/<项目空间名称>/<文件名>

      例:oss://flink-fullymanaged-e6a123456789/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar

    如需查看工作空间的存储类型,在实时计算管理控制台单击目标工作空间操作列下的详情查看。

步骤三:配置运行参数

编辑 nexmark-flink/nexmark_env.sh 文件,填写以下参数。

参数名

说明

示例

END_POINT

实时计算 Flink 版的服务接入点。根据地域选择对应的接入点,详情请参见服务接入点

ververica.cn-hangzhou.aliyuncs.com

AK

阿里云账号的 AccessKey ID。

-

SK

阿里云账号的 AccessKey Secret。

-

WORK_SPACE

项目工作空间的 Workspace ID。

e6a123456789

NAMESPACE

项目工作空间的 Namespace。

flink-default

NEXMARK_JAR

步骤二中上传的 JAR 文件的 OSS 地址。

oss://flink-fullymanaged-e6a123456789/artifacts/namespaces/flink-default/nexmark-flink-0.2-SNAPSHOT.jar

FLINK_VERSION

目标测试的 Flink 引擎版本号。

vvr-11.6-jdk11-flink-1.20

QUERIES

指定运行的 Query。多个 Query 以逗号分隔,例如 q0q1,q2,q3。运行全部 Query 设置为 all

all

说明

运行全部 Query 耗时较长。每条 Query 需要经历作业创建、数据生成、计算执行等阶段。建议先运行单条 Query(例如将 QUERIES 设为 q0),验证环境配置和参数填写无误后,再执行全量测试。

步骤四:运行测试

  1. nexmark-flink 目录下执行以下命令。

    ./run_nexmark.sh
  2. 测试工具通过 OpenAPI 自动创建并运行 Nexmark 作业。

  3. 运行完成后,输出各 Query 的运行时长(毫秒)。示例如下:

    INFO  com.github.nexmark.flink.vvp.Nexmark - q0 13078
    ============================================================================
    ✓ Benchmark execution completed successfully
    ============================================================================

性能表现

以下为 8 CU 计算资源配置下,开源 Flink(1.20.4)与实时计算 Flink 版(vvr-11.5-jdk11-flink-1.20)的 Nexmark 性能对比。每条 Query 输入 1 亿条数据,RPS = 输入数据量 ÷ 用时。

说明

以下测试数据基于特定硬件环境和引擎版本采集。随着底层硬件升级迭代和引擎版本更新,实际性能表现可能存在差异,测试结果仅供参考。

Query

开源 Flink on ECS

Version:1.20.4

实时计算 Flink 版

Version:vvr-11.5-jdk11-flink-1.20

用时(毫秒)

RPS

用时(毫秒)

RPS

RPS 相比开源 Flink(倍)

q0

58848

1,699,293

23450

4,264,392

2.51

q1

57045

1,753,002

22824

4,381,353

2.50

q2

51890

1,927,154

15224

6,568,576

3.41

q3

84986

1,176,664

21558

4,638,649

3.94

q4

553426

180,693

157117

636,468

3.52

q5

365636

273,496

357547

279,684

1.02

q7

1257452

79,526

333837

299,547

3.77

q8

79788

1,253,321

29939

3,340,125

2.67

q9

2324518

43,020

266563

375,146

8.72

q10

189985

526,357

51202

1,953,049

3.71

q11

408384

244,868

145983

685,011

2.80

q12

121554

822,680

36991

2,703,360

3.29

q14

68903

1,451,316

20012

4,997,002

3.44

q15

183709

544,339

42734

2,340,057

4.30

q16

917597

108,980

337293

296,478

2.72

q17

102847

972,318

27076

3,693,308

3.80

q18

574949

173,928

96335

1,038,044

5.97

q19

586287

170,565

95121

1,051,293

6.16

q20

1340638

74,591

231482

431,999

5.79

q21

127089

786,850

39693

2,519,336

3.20

q22

94830

1,054,519

31228

3,202,254

3.04

总计

2383209

49,695,131

9550361

15,317,480

3.24

开源 Flink 测试流程

以下为开源 Flink on ECS 的 Nexmark 测试步骤,供复现验证。

环境准备

通过 EMR on ECS 创建 Flink 集群,集群配置如下:

  • EMR 版本:EMR-5.21.0

  • 硬件规格:3 台 ecs.g6a.xlarge(4 vCPU / 16 GiB),1 台 Master + 2 台 Core

  • 开启 Hadoop 和 HDFS 服务

  • 配置好各个节点之间的免密登录。例如,将安全组的私钥 key.pem上传到 master 节点,然后在 master 节点的 ~/.ssh/config 写入如下配置,注意将 IP 和文件路径替换为实际值:

    Host 192.168.0.0
        HostName 192.168.0.0
        User root
        IdentityFile /path/to/key.pem
        StrictHostKeyChecking no
    Host 192.168.0.1
        HostName 192.168.0.1
        User root
        IdentityFile /path/to/key.pem
        StrictHostKeyChecking no
    Host 192.168.0.2
        HostName 192.168.0.2
        User root
        IdentityFile /path/to/key.pem
        StrictHostKeyChecking no
    使用 ssh 验证各节点间免密登录是否正常。若报错 "bad permissions",执行 chmod 600 /path/to/key.pem 修正权限。

软件准备

  1. 下载目标版本的 Flink 包(Apache Flink Downloads)和 Nexmark 测试包(nexmark-flink.tgz),上传至 Master 节点并解压。

    tar -zxvf flink-1.20.4-bin-scala_2.12.tgz
    tar -zxvf nexmark-flink.tgz
    mv flink-1.20.4 flink
    mv nexmark-flink nexmark
  1. 将 nexmark/lib 目录下的 JAR 文件复制到 flink/lib,这些 JAR 包含 Nexmark 数据生成器。

    cp nexmark/lib/* flink/lib/
  1. 设置环境变量。编辑 ~/.bashrc,加入以下配置后执行 source ~/.bashrc 使其生效。

    根据实际环境配置相关路径。
    export JAVA_HOME=/etc/alternatives/java_sdk_11
    export PATH=$JAVA_HOME/bin:$PATHexport FLINK_HOME=/mnt/disk1/flink
    export HADOOP_CLASSPATH=$(/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop classpath)
  1. 配置 Flink Workers。本文使用 8 个 TaskManager,采用"Master 节点 2 个 + 每个 Core 节点各 3 个"的部署方式。

    编辑 flink/conf/workers,注意将IP替换为实际值。

    192.168.0.0
    192.168.0.0
    192.168.0.1
    192.168.0.1
    192.168.0.1
    192.168.0.2
    192.168.0.2
    192.168.0.2
  1. 用 nexmark/conf/config.yaml 替换 flink/conf/config.yaml,并更新以下配置项:

    • jobmanager.rpc.address:Master 节点 IP,如 192.168.0.0

    • state.checkpoints.dir:HDFS 路径,如 hdfs:///checkpoints

    • taskmanager.memory.process.size4G

  1. 编辑 nexmark/conf/nexmark.yaml,将 nexmark.metric.reporter.host 设为 Master 节点 IP。

  2. 将 flink 和 nexmark 目录及环境变量配置分发到各 Core 节点。

    注意将IP替换为实际值。
    scp -r flink 192.168.0.1:/mnt/disk1/
    scp -r flink 192.168.0.2:/mnt/disk1/
    scp -r nexmark 192.168.0.1:/mnt/disk1/
    scp -r nexmark 192.168.0.2:/mnt/disk1/
    scp ~/.bashrc 192.168.0.1:~/
    scp ~/.bashrc 192.168.0.2:~/

    分发完成后,在各 Core 节点执行 source ~/.bashrc 使环境变量生效。

  1. 在 Master 节点启动 Flink 集群。

    flink/bin/start-cluster.sh
  1. 初始化 Nexmark 测试环境。该脚本会在各节点上配置 Nexmark 运行所需的 Metric Reporter。

    nexmark/bin/setup_cluster.sh

水位设置

Flink 集群启动后,需通过 cgroup 将各 TaskManager 进程的 CPU 水位限制在 75% 以内,避免因资源争抢导致 TaskManager 心跳超时失联。

在所有运行 TaskManager 的节点上(含 Master 节点)执行以下命令:

yum install -y libcgroup libcgroup-tools
cgcreate -t root:root -a root:root -g cpu,memory:mygroup
echo 100000 > /sys/fs/cgroup/cpu/mygroup/cpu.cfs_period_us
echo 300000 > /sys/fs/cgroup/cpu/mygroup/cpu.cfs_quota_us
echo $((12 * 1024 * 1024 * 1024)) > /sys/fs/cgroup/memory/mygroup/memory.limit_in_bytes
jps | grep TaskManagerRunner | awk '{print $1}' | xargs cgclassify -g cpu,memory:mygroup

其中 cpu.cfs_quota_us / cpu.cfs_period_us = 300000 / 100000 = 3,表示该 cgroup 最多使用 3 个 CPU 核心(4 vCPU 的 75%)。

说明

实时计算 Flink 版(全托管环境)无需手动控制水位,可全量使用所购买的计算资源。

运行 Nexmark

在 Master 节点执行以下命令,等待运行结束后查看结果:

nexmark/bin/run_query.sh q0,q1,q2,q3,q4,q5,q7,q8,q9,q10,q11,q12,q14,q15,q16,q17,q18,q19,q20,q21,q22