Celeborn是一个处理中间数据的服务,能够提升大数据引擎的稳定性、灵活性和性能。本文为您介绍如何使用Celeborn服务。

背景信息

目前Shuffle方案的缺点如下:

  • Shuffle Write在大数据量场景下会溢出,导致写放大。

  • Shuffle Read过程中存在大量的网络小包导致的Connection reset问题。

  • Shuffle Read过程中存在大量小数据量的IO请求和随机读,对磁盘和CPU造成高负载。

  • 对于M*N次的连接数,在M和N数千的规模下,作业基本无法完成。

  • NodeManager和Spark Shuffle Service是同一进程,当Shuffle的数据量特别大时,通常会导致NodeManager重启,从而影响YARN调度的稳定性。

Celeborn服务可以优化目前Shuffle方案的问题。Celeborn优势如下:

  • 使用Push-Style Shuffle代替Pull-Style,减少Mapper的内存压力。

  • 支持IO聚合,Shuffle Read的连接数从M*N降到N,同时将随机读更改为顺序读。

  • 支持两副本机制,降低Fetch Fail概率。

  • 支持计算与存储分离架构,可以部署Shuffle Service至特殊硬件环境中,与计算集群分离。

  • 解决Spark on Kubernetes时对本地磁盘的依赖。

Celeborn设计架构图如下。Celeborn

前提条件

已创建E-MapReduce的DataLake集群或自定义集群,并选择Celeborn服务。创建集群详情请参见创建集群

使用限制

此文档仅适用于以下版本的集群。

集群

版本

DataLake集群

EMR-3.45.0及后续版本,EMR-5.11.0及后续版本。

自定义集群

EMR-3.45.0及后续版本,EMR-5.11.0及后续版本。

操作步骤

Spark配置

参数

描述

spark.shuffle.manager

  • 0.4.x及之后版本:固定值为org.apache.spark.shuffle.celeborn.SparkShuffleManager

  • 0.3.x及之前版本:固定值为org.apache.spark.shuffle.celeborn.RssShuffleManager

spark.serializer

固定值为org.apache.spark.serializer.KryoSerializer

spark.celeborn.push.replicate.enabled

是否开启两副本。取值如下:

  • true(默认值):开启两副本。

  • false:不开启两副本。

spark.shuffle.service.enabled

需修改为false,才会使用Celeborn。

使用Celeborn时需要关闭原有的External Shuffle Service。在使用Celeborn的情况下,是可以正常使用Spark的Dynamic Allocation的。

说明
  • 设置spark.shuffle.service.enabled为true,则不使用Celeborn。

  • 阿里云Spark已完成Celeborn适配,开源Spark 3.5版本已完成Celeborn适配。

spark.celeborn.shuffle.writer

Celeborn的wirter支持的模式:

  • hash(默认值):在Partition并发度过大的情况下会使用较多的内存。

  • sort:使用固定大小内存,在Partition并发度很大的情况下,能够稳定工作。

spark.celeborn.master.endpoints

填写格式<celeborn-master-ip>:<celeborn-master-port>

涉及参数如下:

  • <celeborn-master-ip>:Master节点的公网IP地址。

  • <celeborn-master-port>:固定值9097。

高可用集群时配置所有Master节点的IP地址。

spark.sql.adaptive.enabled

Celeborn支持Adaptive Execution,关闭Local Shuffle Reader可以获得最佳的Shuffle性能。

各参数值需要修改为true、false和true。

spark.sql.adaptive.localShuffleReader.enabled

spark.sql.adaptive.skewJoin.enabled

Spark服务支持一键配置使用Celeborn服务。

  • EMR-5.11.1及之后版本,EMR-3.45.1及之后版本

    可以在Spark服务状态页面的服务概述区域,打开或关闭enableCeleborn开关。

  • EMR-5.11.0版本,EMR-3.45.0版本

    可以在Spark服务状态页面的组件列表区域,选择SparkThriftServer操作列的more > enableCelebornmore > disableCeleborn。选择后会自动修改上文表格中的Spark配置项并重启SparkThriftServer,同时会修改spark-defaults.confspark-thriftserver.conf两个配置文件。

    • 选择more > enableCeleborn,所有的Spark任务都使用Celeborn服务。

    • 选择more > disableCeleborn,所有的Spark任务都不使用Celeborn服务。

Celeborn配置

您可以在Celeborn服务配置页面,修改或查看Celeborn所有的配置项。

重要

针对不同的节点组(例如CORE或TASK)各配置项的值是不同的。

参数

描述

默认值

celeborn.worker.flusher.threads

磁盘(HDD或者SSD)的刷盘线程数。

  • HDD磁盘默认是1。

  • SSD磁盘默认是8。

CELEBORN_WORKER_OFFHEAP_MEMORY

Worker堆外内存大小。

根据集群配置自动计算。

celeborn.application.heartbeat.timeout

Application心跳超时时间,超时会清理Application相关资源。

120s

celeborn.worker.flusher.buffer.size

Flush buffer大小,超过最大值会触发刷盘。

256K

celeborn.metrics.enabled

是否打开监控。取值如下:

  • true:打开监控。

  • false:不打开监控。

true

CELEBORN_WORKER_MEMORY

Worker堆内内存大小。

1g

CELEBORN_MASTER_MEMORY

Master堆内内存大小。

2g

重启Celeborn组件

  1. 在Celeborn服务的状态页面,选择CelebornMaster组件操作列的more > restart_clean_meta

    说明

    如果是非高可用集群,您也可以单击CelebornMaster组件操作列的重启

  2. 在弹出的对话框中,关闭滚动执行开关,输入执行原因,单击确定

  3. 在弹出的对话框中,单击确定