当前位置:网站首页>Spark ShuffleManager
Spark ShuffleManager
2022-06-10 18:45:00 【InfoQ】
// Let the user specify short names for shuffle managers
// 让用户指定shuffle managers的简称
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
private[spark] val SHUFFLE_MANAGER =
ConfigBuilder("spark.shuffle.manager")
.version("1.1.0")
.stringConf
.createWithDefault("sort")ShuffleManager

- registerShuffle:向管理器注册shuffle,并获取一个句柄以将其传递给任务。
- getWriter: 在给定的分区上获取writer,由executor的map task调用
- getReader:获取reduce分区范围(包括startPartition到endPartition-1)的读取器,以便从mapOutPut(包括startMapIndex到endMapIndex-1)读取数据。
- ShuffleHandle:shuffle的opaque handle(不透明句柄),所谓的opaque handle其角色类似于基类指针,隐藏实现细节,每个实现者需要提供自己的实现,由 ShuffleManager 用于将有关它的信息传递给tasks。三种实现

registerShuffle()
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
new BaseShuffleHandle(shuffleId, dependency)
}
}private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
false
} else {
val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
}- 少于spark.shuffle.sort.bypassMergeThreshold分区
- 不需要map-side预聚合(如groupByKey()算子)
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"because does not support object relocation")
false
} else if (dependency.mapSideCombine) {
log.debug(s"because we need to do map-side aggregation")
false
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"because it has more than $MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}- 使用的序列化器支持序列化对象的重定位(如KryoSerializer)
- shuffle中没有map-side预聚合
- 分区数不大于MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值(最大分区ID号+1,即2^24=16777216)
getWriter()
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics,
shuffleExecutorComponents)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
bypassMergeSortHandle,
mapId,
env.conf,
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(
shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
}
}- tungsten-sort(SerializedShuffle)对应的是UnsafeShuffleWriter
- BypassMergeSortShuffle 对应的是 BypassMergeSortShuffleWriter
- BaseShuffleHandle 对应的是 SortShuffleWriter
getReader()
override def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}边栏推荐
- 轻松学Pytorch-全卷积神经网络实现表情识别
- Deep understanding of lightgbm
- MATLAB 根据任意角度、取样点数(分辨率)、位置、大小画椭圆代码
- Ding Dong grabs vegetables - monitoring and pushing tools for delivery period
- Matlab draws ellipse code according to any angle, sampling points (resolution), position and size
- Harbor image pull voucher configuration
- Computer: successfully teach you how to use one trick to retrieve the previous password (the password once saved but currently displayed as ******)
- Vs solution to garbled Chinese characters read from txt files (super simple)
- DDD落地实践复盘 - 记理论培训&事件风暴
- Sliding window maximum value problem
猜你喜欢

【web】個人主頁web大作業「課錶」「相册」「留言板」

How to query the database table storage corresponding to a field on the sapgui screen
![[advanced C language] advanced pointer [Part 1]](/img/a7/7a6f5286307d80b553c11582cf1827.png)
[advanced C language] advanced pointer [Part 1]

Yuntu says that every successful business system cannot be separated from apig

This article introduces you to j.u.c's futuretask, fork/join framework and BlockingQueue

100003字,带你解密 双11、618电商大促场景下的系统架构体系
![[C language] still don't understand the structure? Take a look at this article to give you a preliminary understanding of structure](/img/94/c9c7935aa0c98eb39a34377ad02b10.png)
[C language] still don't understand the structure? Take a look at this article to give you a preliminary understanding of structure

【C语言进阶】指针的进阶【中篇】

In the all digital era, how can enterprise it complete transformation?

Sliding window maximum value problem
随机推荐
腾讯Libco协程开源库 源码分析 全系列总结博客
Morris traversal of binary tree
Design and reality of JSP project laboratory management system based on SSM doc
Basic model and properties of SAR echo signal
Rmarkdown easily input mathematical formula
Mongodb 唯一索引
专项测试之「 性能测试」总结
【web】個人主頁web大作業「課錶」「相册」「留言板」
VR全景作品中各式各样的嵌入功能是如何做到的?
LeetCode_并查集_中等_399. 除法求值
源码分析及实践测试OpenFeign负载均衡
2022 software test interview strategy for the strongest version of fresh students to help you get directly to the big factory
2022.05.27 (lc_647_palindrome substring)
It is forbidden to throw away rotten software. A guide for software test engineers to advance from elementary level to advanced level will help you promote all the way
Some questions often asked during the interview. Come and see how many correct answers you can get
2022最强版应届生软件测试面试攻略,助你直通大厂
Tencent cloud database tdsql- a big guy talks about the past, present and future of basic software
如何在VR全景作品中添加独立热点?
高考结束,VR全景云游为你展现景区的美好风光
Mongodb index unique