当前位置:网站首页>Analysis of the writer source code of spark shuffle
Analysis of the writer source code of spark shuffle
2022-06-22 16:47:00 【ZH519080】
Spark-shuffle The most fundamental optimization and urgent problems to be solved : Reduce Mapper End ShuffleWriter The number of files generated , Reduce mapper Benefits of small end files :
- mapper The memory consumption of the terminal is reduced
- Can handle small-scale data , Performance bottlenecks will not be easily reached
- Reducer The number of data acquisition at the end becomes less , Increase efficiency , Reduce network consumption
stay SparkEnv Can be seen in shuffle The configuration properties of the and the current spark Of ShuffleManager Pluggable framework already provided ShuffleManager The concrete realization of , There are hash、sort and tungsten-sort Three shuffle Pattern ,SparkEnv About Shuffle Partial source code of :
// Use short format naming to specify the used ShuffleManager
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
// Appoint ShuffleManager Configuration properties of , By default sort Pattern , from SortShuffleManager Realization
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)among ,tungsten-sort The advantage is that spark From a platform that can only handle small and medium-sized data to a platform that can handle infinite large-scale data , Embodied in spark Computing power of .
sort And tungsten-sort It's all about realizing “org.apache.spark.shuffle.sort.SortShuffleManager”, The difference between the two implementation mechanisms is : Use different Shuffle Data writer .
among , be based on sort(sorted based shuffle) There are :BypassMergeSortShuffleWriter、SortShuffleWriter; be based on tungsten-sort There are :UnsafeShuffleWriter( Serialization sort ).
stay Driver And each Executor Of SparkEnv During instantiation , Will create a ShuffleManager, For managing block data , Provides read / write of cluster block data (getWriter、getReader), It includes local reading and writing of data and reading block data of remote nodes .
Shuffle The framework of the system is ShuffleManager Parse the entry . stay ShuffleManager The entire Shuffle The various components used , Including how to register to ShuffleManager, To get a handle for reading and writing ShuffleHandle, adopt ShuffleHandle Get a specific data read / write interface :ShuffleReader and ShuffleWriter, And how to get the parsing interface of block data information ShuffleBlockResolver. The following is the source code analysis of several important components .
ShuffleManager yes spark shuffle A pluggable interface provided by , When providing a concrete implementation subclass or customizing a concrete implementation subclass, you must override the trait ShuffleManager Abstract method of .
Executor perform stage Is the final call Task.run Method .Task yes abstract class, Its abstract method is composed of subclasses ShuffleMapTask or ResultTask Realization .ShuffleMapTask Will be based on ShuffleDependency The partition specified in , Will a RDD Split elements of into multiple buckets in , At this point through ShuffleManager Of getWriter Interface to obtain data and buckets The mapping relation of . and ResultTask The corresponding one is to return the output to the application Driver Terminal Task, In the Task In the process of execution , Will eventually call RDD Of compute Calculate internal data , With ShuffleDependency Of RDD in , stay compute When calculating , Will pass ShuffleManager Of getReader Interface to get the previous Stage Of Shuffle Output structure as this Task Input data for .
ShuffleMapTask Of runTask The main source code of the method :
override def runTask(context: TaskContext): MapStatus = {
// First of all, from the SparkEnv obtain ShuffleManager, And then from ShuffleDependency Get registered to ShuffleManager When you get shuffleHandle, according to shuffleHandle And the current Task Corresponding partition RDD, obtain ShuffleWriter, Finally, according to the ShuffleWriter, Call its write Interface , Write data to the current partition
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// call RDD Calculate , adopt HashShufleWriter Of write Method to RDD The result of the calculation is persisted
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch { ......
}
}stay ShuffleMapTask Class runTask In the method ,getWriter Methods are idiosyncrasies ShuffleManager Abstract method of ,SortShuffleManager and HashShuffleManager Realize the trait ShuffleManager, With SortShuffleManager Of getWriter Methods as an example , Realization getWriter Method before calling , Must pass SortShuffleManager Of registerShuffle Method direction ShuffleManager Register and get permission to shuffle The handle of handle,SortShuffleManager Of registerShuffle Method source code :
override def registerShuffle[K, V, C](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
// if partition The number of is less than spark.shuffle.sort.bypassMergeThreshold Specified threshold , And you don't have to map End aggregation , direct writing numPartitions File , And spliced into an output file , It can avoid serialization and deserialization twice spill File merge , But more memory needs to be allocated to the buffer
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// When map Serialization is required for end-to-end output
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}SortShuffleManager Of registerShuffle Methods BypassMergeSortShuffleHandle、SerializedShuffleHandle、BaseShuffleHandle They correspond to each other SortShuffleManager Of getWriter Methods BypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter class :
SortShuffleManager.registerShuffle Method | SortShuffleManager.getWriter Method |
BypassMergeSortShuffleHandle | BypassMergeSortShuffleWriter |
SerializedShuffleHandle | UnsafeShuffleWriter |
BaseShuffleHandle | SortShuffleWriter |
therefore : First get shuffle Of writer Handle (handle) To get different writer class
SortShuffleManager Of getWriter Method source code :
override def getWriter[K, V](handle: ShuffleHandle,mapId: Int,context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],bypassMergeSortHandle,mapId,context,env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}BypassMergeSortShuffleWriter Write data source code analysis
This class implements a class with Hash Style based Sort Of Shuffle Mechanism , For each Reduce The end task builds an output file , Write each input record to its corresponding file , Finally, these files based on each partition are merged into an output file .
Use BypassMergeSortShuffleWriter Conditions for writer :
- Cannot specify ordering, If... Is specified ordering when , Will sort the data inside the partition . therefore BypassMergeSortShuffleWriter Avoid sorting overhead .
- Cannot specify aggregator
- The number of partitions is less than spark.shuffle.sort.bypassMergeThreshold The number specified by the configuration property .
BypassMergeSortShuffleWriter Of write Method source code :
@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {// Initialize the index file from , It needs to be rewritten when obtaining the real data in the partition
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
// Configure one disk writer for each partition DiskBlockObjectWriter
partitionWriters = new DiskBlockObjectWriter[numPartitions];
// In this write mode , Will open at the same time numPartitions individual DiskBlockObjectWriter, Therefore, the corresponding partition should not be set too large , Avoid excessive memory overhead , The cache size is 32K(fileBufflerSize Define size ).
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = //createTempShuffleBlock Method description
blockManager.diskBlockManager().createTempShuffleBlock(); // Describe the format of the intermediate temporary file generated by each partition
final File file = tempShuffleBlockIdPlusFile._2(); // Formula and corresponding BlockId
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
}
// Creating file writes and creating disk writes both interact with the disk , If multiple files are opened, it will take a long time , Therefore, the disk write time should be included in shuffle Write time
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (DiskBlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}// This file contains all Reduce End output data
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}therefore : from BypassMergeSortShuffleWriter Of write You know ,map The end task will eventually generate two files , Data files and index files . Use DiskBlockObjectWriter When writing records , In order to 32k Record batch written , It won't take up too much memory , However, aggregators cannot be specified due to correspondence (Aggregator), When writing data, the record will also be written directly , Therefore, it corresponds to the subsequent network I/O The cost of .
SortShuffleWriter Write data source code analysis
BypassMergeSortShuffleWriter The data is written in Reduce An optimization method provided when the number of partitions at the end is small , When the data set is very large , Use BypassMergeSortShuffleWriter The way of writing data is inappropriate , Need to use SortShuffleWriter Write data .
And others ShuffleWriter As a concrete subclass of ,SortShuffleWriter The specific implementation method of writing data is write Method , Its source :
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
// When need is in Map When end polymerization , Specify aggregators Aggregator, take Key It's worth it ordering Pass in an external sorter ExternalSorter in
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// Is not specified Map When using aggregation at the end , Pass in ExternalSorter The aggregator of (Aggregator), And Key It's worth it Ordering All set to None, That is, no need to pass in , Corresponding to Reduce Only when the end reads the data, it aggregates the data according to the aggregator partition data , And according to whether it is set Ordering And choose whether to sort the partition data
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records) // Put all the written data into the external sorter
// And BypassMergeSortShuffleWriter equally , Get the output file name and BlockId
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}External sequencer (ExternalSorter) Inherited Spillable, Therefore, when the memory usage reaches a certain threshold, it will spill To disk , It can reduce the overhead of memory .ExternalSorter Of insertAll Method internal processing completed ( Including aggregate and non aggregate ) For each record , Will check if you need spill, There are many details inside , The concrete realization is Spillable Class mayeSpill In the method , Its process :ExternalSorter.insertAll -> ExeternalSorter.maybeSpillCollection -> Spillable.maybeSpill,Spillable Class maybeSpill Method source code :
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
// Check whether the current number of records is 32 Multiple , That is, record small batches spill; Check whether the current memory exceeds the memory threshold
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// from shuffle Get the current memory from the memory pool 2 times
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = // Actually, first apply for memory , Then judge again , Finally decide whether to spill
taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
myMemoryThreshold += granted
// If memory is too small to grow further or exceed memoryThreshold threshold , It will overflow
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection) //ExternalSorter Of spill Method rewriting Spillable Of spill Method
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}Tungsten-sort:UnsafeShuffleWriter analysis
be based on TungstenSort Of shuffle The implementation mechanism corresponds to the serialization sorting mode (SerializedShuffleHandle). Using serialization UnsafeShuffleWriter Class conditions :
- Shuffle Process without aggregation or output without sorting
- Shuffle The serializer of supports serialization value relocation
- Shuffle The number of output partitions in the process is less than 16777216 individual
With SortShuffleManager Of getWriter The method is known , Data writer class UnsafeShuffleWriter Using variables in shuffleBlockResolver To parse the mapping between logical data block and physical data block , This variable uses the same as Hash Of Shuffle Different parsing classes with different implementation mechanisms IndexShuffleBlockResolver.UnsafeShuffleWriter Of write Method source code :
@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {......
while (records.hasNext()) { // For the input recordset records, Loop to insert each record into the external sorter
insertRecordIntoSorter(records.next());
} // Generate two types of data : Every M The scheduling end generates a data file and a corresponding index file
closeAndWriteOutput();
success = true;
} finally {
if (sorter != null) {
try {
sorter.cleanupResources(); // Release the resources of the external sorter
......}UnsafeShuffleWriter Of insertRecordIntoSorter Method source code :
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {......
final K key = record._1();
final int partitionId = partitioner.getPartition(key);
serBuffer.reset();// Reset the buffer where records are stored , Use ByteArrayOutputStream Storage records , Capacity of 1MB
// Further use the serializer from serBuffer Buffer build serialized output stream , Write record to buffer
serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
// Insert records into an external sorter ,serBuffer It's an array of bytes , The internal data storage offset is Platform.BYTE_ARRAY_OFFSET
sorter.insertRecord(
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}Keep looking at UnsafeShuffleWriter Of write Methods closeAndWriteOutput Method source code :
void closeAndWriteOutput() throws IOException {assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null; // Set to null, be used for GC Garbage collection
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills(); // Close the external sorter , And get all spill Information
sorter = null;
final long[] partitionLengths; //getDataFile Get the output file name through the block parser
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File tmp = Utils.tempFileWith(output); // stay writeIndexFileAndCommit Call the block parser repeatedly to get the file name
try {
partitionLengths = mergeSpills(spills, tmp);
} finally {......
} // Will merge spill The obtained partition and quantity information is written to the index file , And rename the temporary file to the real data file name
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}UnsafeShuffleWriter Of closeAndWriteOutput There are three main steps :
- Start external sorter , obtain spill Information ;
- Merge intermediate spill file , Generate a data file and return the data information corresponding to each partition
- The index file corresponding to the data file is generated according to the data information of each partition
UnsafeShuffleWriter Of closeAndWriteOutput Method is called mergeSpills Method , be UnsafeShuffleWriter Of mergeSpills Method source code :
private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); // Compress configuration information
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled = // Whether to start unsafe Quick merge of
sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
final boolean fastMergeIsSupported = !compressionEnabled || // When there is no compression or compression supports serialization , Fast merge
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
try {
if (spills.length == 0) {
new FileOutputStream(outputFile).close(); // No, spill Intermediate file , Create an empty file
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
// the last one spills The file has been updated metrics Information , There is no need to update repeatedly , Rename directly spills The intermediate temporary file of is the target output data file
Files.move(spills[0].file, outputFile);
return spills[0].partitionLengths;
} else {
final long[] partitionLengths;
// Multiple gears spill Intermediate temporary files , Adopt different file merging strategies according to different conditions
if (fastMergeEnabled && fastMergeIsSupported) {
if (transferToEnabled) {
// adopt NIO Ways to merge spills The data of , Only in I/O Compression codes and sequencers are only safe if they support the merging of serialization streams
partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
} else {
// Use java FileStreams File stream merge
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
}
} else {
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
}
// to update shuffle Write the measurement information of the data
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incShuffleBytesWritten(outputFile.length());
return partitionLengths;
}
} ......}private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); // Compress configuration information
final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
final boolean fastMergeEnabled = // Whether to start unsafe Quick merge of
sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
final boolean fastMergeIsSupported = !compressionEnabled || // When there is no compression or compression supports serialization , Fast merge
CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
try {
if (spills.length == 0) {
new FileOutputStream(outputFile).close(); // No, spill Intermediate file , Create an empty file
return new long[partitioner.numPartitions()];
} else if (spills.length == 1) {
// the last one spills The file has been updated metrics Information , There is no need to update repeatedly , Rename directly spills The intermediate temporary file of is the target output data file
Files.move(spills[0].file, outputFile);
return spills[0].partitionLengths;
} else {
final long[] partitionLengths;
// Multiple gears spill Intermediate temporary files , Adopt different file merging strategies according to different conditions
if (fastMergeEnabled && fastMergeIsSupported) {
if (transferToEnabled) {
// adopt NIO Ways to merge spills The data of , Only in I/O Compression codes and sequencers are only safe if they support the merging of serialization streams
partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
} else {
// Use java FileStreams File stream merge
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
}
} else {
partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
}
// to update shuffle Write the measurement information of the data
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incShuffleBytesWritten(outputFile.length());
return partitionLengths;
}
} ......}
边栏推荐
猜你喜欢

web技术分享| 【高德地图】实现自定义的轨迹回放

SAP ABAP sub screen tutorial: call sub screen -010 in SAP
![Prometheus监控之Consul监控 [consul-exporter]](/img/9e/8547b2c38143ab0e051c1cf0b04986.png)
Prometheus监控之Consul监控 [consul-exporter]
![[C language] deeply analyze the storage of integer and floating-point types in memory](/img/8b/12a4dc7a0c0e34e2add007592971dd.jpg)
[C language] deeply analyze the storage of integer and floating-point types in memory

【C语言】库函数qsort的使用

Machine learning notes - Hagrid - Introduction to gesture recognition image data set

2022年中国重卡智能化升级专题研究

linux系统维护篇:mysql8.0.13源码下载及安装之“傻瓜式”操作步骤(linux-centos6.8)亲测可用系列

Vhedt business development framework

CUMT学习日记——数字图像处理考试速成笔记
随机推荐
JSP learning (2) -- JSP script elements and instructions
[C language] use of library function qsort
Implementation classes with similar execution logic use the template pattern
nio编程service
ALV report in SAP tutorial - ABAP list viewer -012
IO模型的5中模式
【游标的嵌套】mysql存储过程游标的嵌套
执行逻辑大同小异的实现类使用模板模式
Summary of JS methods for obtaining data types
[MYSQL]数据同步提示:Specified key was too long;max key length is 767 bytes
In the era of video explosion, who is supporting the high-speed operation of video ecological network?
Implementing factory mode using enumeration
In case of default import failure
scala-for的基本应用
短视频源码开发,优质的短视频源码需要做好哪几点?
redis.clients.jedis.exceptions.JedisDataException ERR invalid password.
Shell learning
Iterators and generators
SAP ABAP data types, operators and editors-02
Uniapp wechat applet obtains page QR code (with parameters)