本文介绍Spark代码中参数及Smart Shuffle优化配置参数。

Spark代码中可以使用如下参数配置。
属性名 默认值 说明
spark.hadoop.fs.jfs.cache.oss-accessKeyId 访问OSS所需的AccessKey ID(可选)。
spark.hadoop.fs.jfs.cache.oss-accessKeySecret 访问OSS所需的AccessKey Secret(可选)。
spark.hadoop.fs.jfs.cache.oss-securityToken 访问OSS所需的STS token(可选)。
spark.hadoop.fs.jfs.cache.oss-endpoint 访问OSS的Endpoint(可选)。
spark.hadoop.fs.oss.copy.simple.max.byte 134217728 使用普通接口进行OSS内部拷贝的文件大小上限。
spark.hadoop.fs.oss.multipart.split.max.byte 67108864 使用普通接口进行OSS内部拷贝的文件分片大小上限。
spark.hadoop.fs.oss.impl com.aliyun.emr.fs.oss.JindoOssFileSystem OSS文件系统实现类。
spark.hadoop.job.runlocal false 当数据源是OSS时,是否需要本地调试运行Spark代码,需要时设置此项为true,否则为false。
spark.logservice.fetch.interval.millis 200 Receiver向LogHub取数据的时间间隔。
spark.logservice.fetch.inOrder true 是否有序消费分裂后的Shard数据。
spark.logservice.heartbeat.interval.millis 30000 消费进程的心跳保持间隔。
spark.mns.batchMsg.size 16 批量拉取MNS消息条数,最大值为16。
spark.mns.pollingWait.seconds 30 MNS队列为空时的拉取等待间隔。
spark.hadoop.io.compression.codec.snappy.native false 标识Snappy文件是否为标准Snappy文件。Hadoop默认识别的是Hadoop修改过的Snappy格式文件。

Smart Shuffle优化配置

Smart Shuffle是EMR-3.16.0针对Spark SQL提供的一种Shuffle实现,适合处理Shuffle数据量较大的查询。通过提前将Shuffle数据经过网络传输到远程节点,提高Spark SQL的执行效率。

启用Smart Shuffle后,Shuffle Output数据不会存储在本地磁盘,而是提前通过网络传输到远端节点,所有属于相同Partition的数据会被传输到同一个节点,并存储在同一个文件中。使用Smart Shuffle有以下限制:
  • Smart Shuffle不保证Task执行的原子性,当Task失败时,需要重启整个Spark job。
  • 因为没有提供External Smart Shuffle实现,所以不支持Dynamic Resource Allocation。
  • 不支持Speculative Task特性。
  • 与Adaptive Execution不兼容。
您可以通过Spark配置文件或者spark-submit参数启用Smart Shuffle,相关配置如下。
参数名 默认值 说明
spark.shuffle.manager sort 通过spark.shuffle.manager=org.apache.spark.shuffle.sort.SmartShuffleManagerspark.shuffle.manager=smart配置Spark Session的Shuffle,启用Smart shuffle功能。
spark.shuffle.smart.spill.memorySizeForceSpillThreshold 128m 设置Shuffle数据占用内存的大小,超过阈值后,Shuffle数据会根据Sartition通过网络传输到对应远程节点。
spark.shuffle.smart.transfer.blockSize 1m 设置Shuffle在网络传输数据块的大小。