当前位置:网站首页>Spark ShuffleManager

Spark ShuffleManager

2022-06-10 18:45:00 InfoQ


SparkEnv中Shuffle的初始化
// Let the user specify short names for shuffle managers
// 让用户指定shuffle managers的简称
val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

ShuffleManager的几次变更

val shortShuffleMgrNames = Map( 
      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", 
      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") 


引入了tungsten-sort 
更快的基于排序的shuffle, 一个新的 shuffle 管理器,该管理器通过一种新的缓存友好排序算法来增强现有的基于排序的 shuffle,该算法直接对二进制数据进行操作。此补丁的目标是在 shuffle 期间降低内存使用和 Java 对象开销,并加快排序。它还为后续补丁奠定了基础,这些补丁将使序列化记录的端到端处理成为可能。
val shortShuffleMgrNames = Map( 
      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", 
      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", 
      "tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager") 


两个ShuffleManager的合并 
SortShuffleManager 和 UnsafeShuffleManager 之间有很多重复。鉴于这些现在提供相同的功能集,现在 UnsafeShuffleManager 支持大记录,我认为我们应该用 UnsafeShuffleManager 替换 SortShuffleManager 的序列化 shuffle 实现,并且应该将两个管理器合并在一起。 
val shortShuffleMgrNames = Map( 
      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", 
      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", 
      "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") 


2016-04-19,现在是时候移除旧的HashShuffleManager了 
val shortShuffleMgrNames = Map( 
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, 
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) 

注:tungsten-sort 
https://issues.apache.org/jira/browse/SPARK-7081

SparkEnv.scala
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER) 

org.apache.spark.internal.config,
 默认 sort shuffle 
private[spark] val SHUFFLE_MANAGER =
    ConfigBuilder("spark.shuffle.manager")
      .version("1.1.0")
      .stringConf
      .createWithDefault("sort")

ShuffleManager 

ShuffleManager是Shuffle系统的可插拔接口。基于 spark.shuffle.manager 设置,在Driver和每一个Executor上的 SparkEnv 中创建。Driver使用它注册 shuffle,Executor(或在Executor中本地运行的task)可以通过他请求读取和写入数据。 

结构如下: 


  • registerShuffle
    :向管理器注册shuffle,并获取一个句柄以将其传递给任务。 
  • getWriter
    : 在给定的分区上获取writer,由executor的map task调用 
  • getReader
    :获取reduce分区范围(包括startPartition到endPartition-1)的读取器,以便从mapOutPut(包括startMapIndex到endMapIndex-1)读取数据。 
  • ShuffleHandle:shuffle的opaque handle(不透明句柄),所谓的opaque handle其角色类似于基类指针,隐藏实现细节,每个实现者需要提供自己的实现,由 ShuffleManager 用于将有关它的信息传递给tasks。三种实现 


关于句柄 
https://zhuanlan.zhihu.com/p/361975054


ShuffleManager的子类实现
org.apache.spark.shuffle.sort.SortShuffleManager 是
ShuffleManager唯一的子类实现。
registerShuffle()
上面已经说过,registerShuffle向管理器注册shuffle,并获取一个句柄以将其传递给任务。在具体实现类中,会根据不同条件来返回三种不同的ShuffleHandle,也就是对应着三种不同的Shuffle 

override def registerShuffle[K, V, C](
      shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      new SerializedShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      new BaseShuffleHandle(shuffleId, dependency)
    }
  }

第一个条件:SortShuffleWriter.shouldBypassMergeSort(conf, dependency) 
private[spark] object SortShuffleWriter {
  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    // We cannot bypass sorting if we need to do map-side aggregation.
    if (dep.mapSideCombine) {
      false
    } else {
      val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
&nbsp; &nbsp; &nbsp; dep.partitioner.numPartitions <= bypassMergeThreshold
&nbsp; &nbsp; }
&nbsp; }
}

从shouldBypassMergeSort方法中可以清晰的看出,满足BypassMergeSortShuffleHandle有两个要求: 
  • 少于spark.shuffle.sort.bypassMergeThreshold分区 
  • 不需要map-side预聚合(如groupByKey()算子)

返回BypassMergeSortShuffleHandle,并且采用BypassMergeSortShuffle,那么直接写入numPartitions文件,并在末尾将它们连接起来。这避免了为了合并溢出的文件而进行两次序列化和反序列化,这在正常的代码路径中会发生。缺点是一次打开多个文件,从而为缓冲区分配更多内存。
第二个条件:SortShuffleManager.
canUseSerializedShuffle
(dependency) 
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
&nbsp; &nbsp; val shufId = dependency.shuffleId
&nbsp; &nbsp; val numPartitions = dependency.partitioner.numPartitions
&nbsp; &nbsp; if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
&nbsp; &nbsp; &nbsp; log.debug(s&quot;because does not support object relocation&quot;)
&nbsp; &nbsp; &nbsp; false
&nbsp; &nbsp; } else if (dependency.mapSideCombine) {
&nbsp; &nbsp; &nbsp; log.debug(s&quot;because we need to do map-side aggregation&quot;)
&nbsp; &nbsp; &nbsp; false
&nbsp; &nbsp; } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
&nbsp; &nbsp; &nbsp; log.debug(s&quot;because it has more than $MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions&quot;)
&nbsp; &nbsp; &nbsp; false
&nbsp; &nbsp; } else {
&nbsp; &nbsp; &nbsp; log.debug(s&quot;Can use serialized shuffle for shuffle $shufId&quot;)
&nbsp; &nbsp; &nbsp; true
&nbsp; &nbsp; }
 }

用于确定 shuffle 是否应使用优化的序列化 shuffle 路径或是否应回退到对反序列化对象进行操作的原始路径的辅助方法。
也就是说,如果同时满足以下三个条件: 
  • 使用的序列化器支持序列化对象的重定位(如KryoSerializer) 
  • shuffle中没有map-side预聚合 
  • 分区数不大于MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值(最大分区ID号+1,即2^24=16777216) 
综上会返回SerializedShuffleHandle,启用序列化sort shuffle机制(也就是tungsten-sort),尝试以序列化形式缓冲映射输出,因为这样更有效。 

第三个条件:BaseShuffleHandle 
在不满足bypass和tungsten-sort的情况下,则返回BaseShuffleHandle,采用基础的sort shuffle,以反序列化形式输出的缓冲区映射 

getWriter()
在给定分区上获取writer,由executor上的map task调用
override def getWriter[K, V](
&nbsp; &nbsp; &nbsp; handle: ShuffleHandle,
&nbsp; &nbsp; &nbsp; mapId: Long,
&nbsp; &nbsp; &nbsp; context: TaskContext,
&nbsp; &nbsp; &nbsp; metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
&nbsp; &nbsp; val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
&nbsp; &nbsp; &nbsp; handle.shuffleId, _ => new OpenHashSet[Long](16))
&nbsp; &nbsp; mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
&nbsp; &nbsp; val env = SparkEnv.get
&nbsp; &nbsp; handle match {
&nbsp; &nbsp; &nbsp; case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
&nbsp; &nbsp; &nbsp; &nbsp; new UnsafeShuffleWriter(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; env.blockManager,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; context.taskMemoryManager(),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; unsafeShuffleHandle,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mapId,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; context,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; env.conf,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; metrics,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; shuffleExecutorComponents)
&nbsp; &nbsp; &nbsp; case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
&nbsp; &nbsp; &nbsp; &nbsp; new BypassMergeSortShuffleWriter(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; env.blockManager,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; bypassMergeSortHandle,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mapId,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; env.conf,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; metrics,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; shuffleExecutorComponents)
&nbsp; &nbsp; &nbsp; case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
&nbsp; &nbsp; &nbsp; &nbsp; new SortShuffleWriter(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
&nbsp; &nbsp; }
}

根据传入handle的不同类型,返回不同的ShuffleWriter,而handle类型则正是上面registerShuffle返回的。 
  • tungsten-sort(SerializedShuffle)对应的是UnsafeShuffleWriter 
  • BypassMergeSortShuffle 对应的是 BypassMergeSortShuffleWriter 
  • BaseShuffleHandle 对应的是 SortShuffleWriter 

getReader()
由executor上reduce task来调用,方法返回ShuffleReader,首先在mapOutputTracker中获取要读取的数据地址(block地址),再通过BlockStoreShuffleReader来读取。 
&nbsp; override def getReader[K, C](
&nbsp; &nbsp; &nbsp; handle: ShuffleHandle,
&nbsp; &nbsp; &nbsp; startMapIndex: Int,
&nbsp; &nbsp; &nbsp; endMapIndex: Int,
&nbsp; &nbsp; &nbsp; startPartition: Int,
&nbsp; &nbsp; &nbsp; endPartition: Int,
&nbsp; &nbsp; &nbsp; context: TaskContext,
&nbsp; &nbsp; &nbsp; metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
&nbsp; &nbsp; val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
&nbsp; &nbsp; &nbsp; handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
&nbsp; &nbsp; new BlockStoreShuffleReader(
&nbsp; &nbsp; &nbsp; handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
&nbsp; &nbsp; &nbsp; shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
&nbsp; &nbsp; }

原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://xie.infoq.cn/article/1537b12bcdc83053efa88c439