本文介绍 Spark 代码中参数说明及 Smart Shuffle 优化配置参数说明。

Spark 代码中可使用如下参数配置:

属性名 默认值 说明
spark.hadoop.fs.oss.accessKeyId 访问 OSS 所需的 AccessKey ID(可选)
spark.hadoop.fs.oss.accessKeySecret 访问 OSS 所需的 AccessKey Secret(可选)
spark.hadoop.fs.oss.securityToken 访问 OSS 所需的 STS token(可选)
spark.hadoop.fs.oss.endpoint 访问 OSS 的 endpoint(可选)
spark.hadoop.fs.oss.multipart.thread.number 5 并发进行 OSS 的 upload part copy 的并发度
spark.hadoop.fs.oss.copy.simple.max.byte 134217728 使用普通接口进行 OSS 内部 copy 的文件大小上限
spark.hadoop.fs.oss.multipart.split.max.byte 67108864 使用普通接口进行 OSS 内部 copy 的文件分片大小上限
spark.hadoop.fs.oss.multipart.split.number 5 使用普通接口进行 OSS 内部拷贝的文件分片数目,默认和拷贝并发数目保持一致
spark.hadoop.fs.oss.impl com.aliyun.fs.oss.nat.NativeOssFileSystem OSS 文件系统实现类
spark.hadoop.fs.oss.buffer.dirs /mnt/disk1,/mnt/disk2,… OSS 本地临时文件目录,默认使用集群的数据盘
spark.hadoop.fs.oss.buffer.dirs.exists false 是否确保 OSS 临时目录已经存在
spark.hadoop.fs.oss.client.connection.timeout 50000 OSS Client 端的连接超时时间(单位毫秒)
spark.hadoop.fs.oss.client.socket.timeout 50000 OSS Client 端的 socket 超时时间(单位毫秒)
spark.hadoop.fs.oss.client.connection.ttl -1 连接存活时间
spark.hadoop.fs.oss.connection.max 1024 最大连接数目
spark.hadoop.job.runlocal false 当数据源是 OSS 时,如果需要本地调试运行 Spark 代码,需要设置此项为“true”,否则为“false”
spark.logservice.fetch.interval.millis 200 Receiver 向 LogHub 取数据的时间间隔
spark.logservice.fetch.inOrder true 是否有序消费分裂后的 Shard 数据
spark.logservice.heartbeat.interval.millis 30000 消费进程的心跳保持间隔
spark.mns.batchMsg.size 16 批量拉取 MNS 消息条数,最大不能超过 16
spark.mns.pollingWait.seconds 30 MNS 队列为空时的拉取等待间隔
spark.hadoop.io.compression.codec.snappy.native false 标识 Snappy 文件是否为标准 Snappy 文件,Hadoop 默认识别的是 Hadoop 修改过的 Snappy 格式文件

Smart Shuffle优化配置

Smart Shuffle 是 EMR 3.16.0 针对 Spark SQL 提供的一种 Shuffle 实现,适合处理 Shuffle 数据量比较大的查询,通过提前将 Shuffle 数据经网络传输到远程节点,提高 Spark SQL 的执行效率。启用 Smart Shuffle 后,Shuffle output 数据不会存储在本地磁盘,而是提前通过网络传输到远端节点,所有属于相同 Partition 的数据会被传输到同一个节点,并存储在同一个文件中。目前使用 Smart Shuffle 有以下限制:
  • Smart Shuffle 目前并不保证 task 执行的原子性,当 task 失败时,需要重启整个 Spark job。
  • 目前没有提供 External Smart Shuffle 实现,不支持 Dynamic Resource Allocation。
  • 不支持 Speculative Task 特性。
  • 与 Adaptive Execution 不兼容。
Smart Shuffle相关配置如下,可通过 Spark 配置文件或者 spark-submit 参数启用:
参数名 默认值 说明
spark.shuffle.manager sort 通过 spark.shuffle.manager=org.apache.spark.shuffle.sort.SmartShuffleManagerspark.shuffle.manager=smart 配置 Spark Session 的Shuffle,启用 smart shuffle 功能。
spark.shuffle.smart.spill.memorySizeForceSpillThreshold 128m 启用 Smart Shuffle时,每个 task 可用作写 Shuffle 数据的内存大小,超过阈值后,Shuffle数据会根据 Sartition 通过网络传输到对应远程节点。
spark.shuffle.smart.transfer.blockSize 1m 启用 Smart Shuffle时,用作网络传输的缓存大小。