当前位置:网站首页>Spark ShuffleManager
Spark ShuffleManager
2022-06-10 18:45:00 【InfoQ】
// Let the user specify short names for shuffle managers
// 让用户指定shuffle managers的简称
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
private[spark] val SHUFFLE_MANAGER =
ConfigBuilder("spark.shuffle.manager")
.version("1.1.0")
.stringConf
.createWithDefault("sort")ShuffleManager

- registerShuffle:向管理器注册shuffle,并获取一个句柄以将其传递给任务。
- getWriter: 在给定的分区上获取writer,由executor的map task调用
- getReader:获取reduce分区范围(包括startPartition到endPartition-1)的读取器,以便从mapOutPut(包括startMapIndex到endMapIndex-1)读取数据。
- ShuffleHandle:shuffle的opaque handle(不透明句柄),所谓的opaque handle其角色类似于基类指针,隐藏实现细节,每个实现者需要提供自己的实现,由 ShuffleManager 用于将有关它的信息传递给tasks。三种实现

registerShuffle()
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
new BaseShuffleHandle(shuffleId, dependency)
}
}private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
false
} else {
val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
}- 少于spark.shuffle.sort.bypassMergeThreshold分区
- 不需要map-side预聚合(如groupByKey()算子)
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"because does not support object relocation")
false
} else if (dependency.mapSideCombine) {
log.debug(s"because we need to do map-side aggregation")
false
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"because it has more than $MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}- 使用的序列化器支持序列化对象的重定位(如KryoSerializer)
- shuffle中没有map-side预聚合
- 分区数不大于MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值(最大分区ID号+1,即2^24=16777216)
getWriter()
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics,
shuffleExecutorComponents)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
bypassMergeSortHandle,
mapId,
env.conf,
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(
shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
}
}- tungsten-sort(SerializedShuffle)对应的是UnsafeShuffleWriter
- BypassMergeSortShuffle 对应的是 BypassMergeSortShuffleWriter
- BaseShuffleHandle 对应的是 SortShuffleWriter
getReader()
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}边栏推荐
- This article introduces you to j.u.c's futuretask, fork/join framework and BlockingQueue
- 详细解读TPH-YOLOv5 | 让目标检测任务中的小目标无处遁形
- Sliding window maximum value problem
- Detailed interpretation of tph-yolov5 | making small targets in target detection tasks invisible
- [6.4-6.10] wonderful review of Blog
- 软件测试月薪10K如何涨到30K,只有自动化测试能做到
- Micronet practice: image classification using micronet
- SAR回波信号基本模型与性质
- 2022.05.28 (lc_5_longest palindrome substring)
- Multi channel signal data compression storage
猜你喜欢

专项测试之「 性能测试」总结

Detailed interpretation of tph-yolov5 | making small targets in target detection tasks invisible

基于改进SEIR模型分析上海疫情

Apicloud visual development - one click generation of professional source code

2022.05.29 (lc_6079_price reduction)

When the college entrance examination is opened, VR panorama can see the test site in this way

轻松学Pytorch-全卷积神经网络实现表情识别

DDD landing practice repeat record of theoretical training & Event storm

2022.05.28 (lc_5_longest palindrome substring)

100003 words, take you to decrypt the system architecture under the double 11 and 618 e-commerce promotion scenarios
随机推荐
[01] every high-quality author deserves to be seen. Let's take a look at this week's high-quality content!
TiDB - 快速入门,集群搭建
Developers changing the world - Yao Guang teenagers playing Tetris
Before we learn about high-performance computing, let's take a look at its history
Super simple course design SSM student management system (including simple addition, deletion, modification and query of source code)
腾讯Libco协程开源库 源码分析(二)---- 柿子先从软的捏 入手示例代码 正式开始探究源码
深入理解LightGBM
Rmarkdown easily input mathematical formula
Deep understanding of lightgbm
[C language] still don't understand the structure? Take a look at this article to give you a preliminary understanding of structure
Domain Driven Design (VI) - Architecture Design
VR全景如何应用在家装中?体验真实的家装效果
【C语言进阶】指针的进阶【中篇】
Implementation analysis of single image haze removal using dark channel prior
Harbor镜像拉取凭证配置
How to add independent hotspots in VR panoramic works?
DataScience&ML:金融科技领域之风控之风控指标/字段相关概念、口径逻辑之详细攻略
Docker/Rancher2部署redis:5.0.9
Go language learning notes - cross domain configuration, global exception capture | web framework gin (IV)
源码分析及实践测试OpenFeign负载均衡