当前位置:网站首页>Analysis of the writer source code of spark shuffle

Analysis of the writer source code of spark shuffle

2022-06-22 16:47:00 ZH519080

Spark-shuffle The most fundamental optimization and urgent problems to be solved : Reduce Mapper End ShuffleWriter The number of files generated , Reduce mapper Benefits of small end files :

  1. mapper The memory consumption of the terminal is reduced
  2. Can handle small-scale data , Performance bottlenecks will not be easily reached
  3. Reducer The number of data acquisition at the end becomes less , Increase efficiency , Reduce network consumption

stay SparkEnv Can be seen in shuffle The configuration properties of the and the current spark Of ShuffleManager Pluggable framework already provided ShuffleManager The concrete realization of , There are hash、sort and tungsten-sort Three shuffle Pattern ,SparkEnv About Shuffle Partial source code of :

//  Use short format naming to specify the used ShuffleManager
val shortShuffleMgrNames = Map(
  "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
  "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
  "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
// Appoint ShuffleManager Configuration properties of , By default sort Pattern , from SortShuffleManager Realization 
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

among ,tungsten-sort The advantage is that spark From a platform that can only handle small and medium-sized data to a platform that can handle infinite large-scale data , Embodied in spark Computing power of .

sort And tungsten-sort It's all about realizing “org.apache.spark.shuffle.sort.SortShuffleManager”, The difference between the two implementation mechanisms is : Use different Shuffle Data writer .

among , be based on sortsorted based shuffle) There are :BypassMergeSortShuffleWriterSortShuffleWriter; be based on tungsten-sort There are :UnsafeShuffleWriter( Serialization sort ).

stay Driver And each Executor Of SparkEnv During instantiation , Will create a ShuffleManager, For managing block data , Provides read / write of cluster block data (getWriter、getReader), It includes local reading and writing of data and reading block data of remote nodes .

Shuffle The framework of the system is ShuffleManager Parse the entry . stay ShuffleManager The entire Shuffle The various components used , Including how to register to ShuffleManager, To get a handle for reading and writing ShuffleHandle, adopt ShuffleHandle Get a specific data read / write interface :ShuffleReader and ShuffleWriter, And how to get the parsing interface of block data information ShuffleBlockResolver. The following is the source code analysis of several important components .

ShuffleManager yes spark shuffle A pluggable interface provided by , When providing a concrete implementation subclass or customizing a concrete implementation subclass, you must override the trait ShuffleManager Abstract method of .

Executor perform stage Is the final call Task.run Method .Task yes abstract class, Its abstract method is composed of subclasses ShuffleMapTask or ResultTask Realization .ShuffleMapTask Will be based on ShuffleDependency The partition specified in , Will a RDD Split elements of into multiple buckets in , At this point through ShuffleManager Of getWriter Interface to obtain data and buckets The mapping relation of . and ResultTask The corresponding one is to return the output to the application Driver Terminal Task, In the Task In the process of execution , Will eventually call RDD Of compute Calculate internal data , With ShuffleDependency Of RDD in , stay compute When calculating , Will pass ShuffleManager Of getReader Interface to get the previous Stage Of Shuffle Output structure as this Task Input data for .

ShuffleMapTask Of runTask The main source code of the method :

override def runTask(context: TaskContext): MapStatus = {
// First of all, from the SparkEnv obtain ShuffleManager, And then from ShuffleDependency Get registered to ShuffleManager When you get shuffleHandle, according to shuffleHandle And the current Task Corresponding partition RDD, obtain ShuffleWriter, Finally, according to the ShuffleWriter, Call its write Interface , Write data to the current partition 
  var writer: ShuffleWriter[Any, Any] = null
  try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// call RDD Calculate , adopt HashShufleWriter Of write Method to RDD The result of the calculation is persisted 
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get
  } catch { ......
  }
}

stay ShuffleMapTask Class runTask In the method ,getWriter Methods are idiosyncrasies ShuffleManager Abstract method of ,SortShuffleManager and HashShuffleManager Realize the trait ShuffleManager, With SortShuffleManager Of getWriter Methods as an example , Realization getWriter Method before calling , Must pass SortShuffleManager Of registerShuffle Method direction ShuffleManager Register and get permission to shuffle The handle of handle,SortShuffleManager Of registerShuffle Method source code :

override def registerShuffle[K, V, C](shuffleId: Int,numMaps: Int,dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
    //  if partition The number of is less than spark.shuffle.sort.bypassMergeThreshold Specified threshold , And you don't have to map End aggregation , direct writing numPartitions File , And spliced into an output file , It can avoid serialization and deserialization twice spill File merge , But more memory needs to be allocated to the buffer 
    new BypassMergeSortShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
    //  When map Serialization is required for end-to-end output 
    new SerializedShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else {
    // Otherwise, buffer map outputs in a deserialized form:
    new BaseShuffleHandle(shuffleId, numMaps, dependency)
  }
}

SortShuffleManager Of registerShuffle Methods BypassMergeSortShuffleHandle、SerializedShuffleHandle、BaseShuffleHandle They correspond to each other SortShuffleManager Of getWriter Methods BypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter class :

SortShuffleManager.registerShuffle Method

SortShuffleManager.getWriter Method

BypassMergeSortShuffleHandle

BypassMergeSortShuffleWriter

SerializedShuffleHandle

UnsafeShuffleWriter

BaseShuffleHandle

SortShuffleWriter

therefore : First get shuffle Of writer Handle (handle) To get different writer class

SortShuffleManager Of getWriter Method source code :

override def getWriter[K, V](handle: ShuffleHandle,mapId: Int,context: TaskContext): ShuffleWriter[K, V] = {
  numMapsForShuffle.putIfAbsent(
    handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
  val env = SparkEnv.get
  handle match {
    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
      new UnsafeShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],context.taskMemoryManager(),unsafeShuffleHandle,mapId,context,env.conf)
    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>new BypassMergeSortShuffleWriter(env.blockManager,shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],bypassMergeSortHandle,mapId,context,env.conf)
    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
      new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
  }
}

BypassMergeSortShuffleWriter Write data source code analysis

This class implements a class with Hash Style based Sort Of Shuffle Mechanism , For each Reduce The end task builds an output file , Write each input record to its corresponding file , Finally, these files based on each partition are merged into an output file .

Use BypassMergeSortShuffleWriter Conditions for writer :

  1. Cannot specify ordering, If... Is specified ordering when , Will sort the data inside the partition . therefore BypassMergeSortShuffleWriter Avoid sorting overhead .
  2. Cannot specify aggregator
  3. The number of partitions is less than spark.shuffle.sort.bypassMergeThreshold The number specified by the configuration property .

BypassMergeSortShuffleWriter Of write Method source code :

@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
  assert (partitionWriters == null);
  if (!records.hasNext()) {// Initialize the index file from , It needs to be rewritten when obtaining the real data in the partition 
    partitionLengths = new long[numPartitions];
    shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
    return;
  }
  final SerializerInstance serInstance = serializer.newInstance();
  // Configure one disk writer for each partition DiskBlockObjectWriter
  partitionWriters = new DiskBlockObjectWriter[numPartitions];
// In this write mode , Will open at the same time numPartitions individual DiskBlockObjectWriter, Therefore, the corresponding partition should not be set too large , Avoid excessive memory overhead , The cache size is 32K(fileBufflerSize Define size ).
  for (int i = 0; i < numPartitions; i++) {
    final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =  //createTempShuffleBlock Method description 
      blockManager.diskBlockManager().createTempShuffleBlock(); // Describe the format of the intermediate temporary file generated by each partition 
    final File file = tempShuffleBlockIdPlusFile._2();  // Formula and corresponding BlockId
    final BlockId blockId = tempShuffleBlockIdPlusFile._1();
    partitionWriters[i] =
      blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
  }
  //  Creating file writes and creating disk writes both interact with the disk , If multiple files are opened, it will take a long time , Therefore, the disk write time should be included in shuffle Write time 
  writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
  while (records.hasNext()) {
    final Product2<K, V> record = records.next();
    final K key = record._1();
    partitionWriters[partitioner.getPartition(key)].write(key, record._2());
  }
  for (DiskBlockObjectWriter writer : partitionWriters) {
    writer.commitAndClose();
  }// This file contains all Reduce End output data 
  File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
  File tmp = Utils.tempFileWith(output);
  partitionLengths = writePartitionedFile(tmp);
  shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
  mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

therefore : from BypassMergeSortShuffleWriter Of write You know ,map The end task will eventually generate two files , Data files and index files . Use DiskBlockObjectWriter When writing records , In order to 32k Record batch written , It won't take up too much memory , However, aggregators cannot be specified due to correspondence (Aggregator), When writing data, the record will also be written directly , Therefore, it corresponds to the subsequent network I/O The cost of .

SortShuffleWriter Write data source code analysis

BypassMergeSortShuffleWriter The data is written in Reduce An optimization method provided when the number of partitions at the end is small , When the data set is very large , Use BypassMergeSortShuffleWriter The way of writing data is inappropriate , Need to use SortShuffleWriter Write data .

And others ShuffleWriter As a concrete subclass of ,SortShuffleWriter The specific implementation method of writing data is write Method , Its source :

override def write(records: Iterator[Product2[K, V]]): Unit = {
  sorter = if (dep.mapSideCombine) {
// When need is in Map When end polymerization , Specify aggregators Aggregator, take Key It's worth it ordering Pass in an external sorter ExternalSorter in 
    require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
    new ExternalSorter[K, V, C](
      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  } else {
    //  Is not specified Map When using aggregation at the end , Pass in ExternalSorter The aggregator of (Aggregator), And Key It's worth it Ordering All set to None, That is, no need to pass in , Corresponding to Reduce Only when the end reads the data, it aggregates the data according to the aggregator partition data , And according to whether it is set Ordering And choose whether to sort the partition data 
    new ExternalSorter[K, V, V](
      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  }
  sorter.insertAll(records)  // Put all the written data into the external sorter 
  //  And BypassMergeSortShuffleWriter equally , Get the output file name and BlockId
  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  val tmp = Utils.tempFileWith(output)
  val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
  val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
  shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}

External sequencer (ExternalSorter) Inherited Spillable, Therefore, when the memory usage reaches a certain threshold, it will spill To disk , It can reduce the overhead of memory .ExternalSorter Of insertAll Method internal processing completed ( Including aggregate and non aggregate ) For each record , Will check if you need spill, There are many details inside , The concrete realization is Spillable Class mayeSpill In the method , Its process :ExternalSorter.insertAll -> ExeternalSorter.maybeSpillCollection -> Spillable.maybeSpill,Spillable Class maybeSpill Method source code :

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
  var shouldSpill = false
// Check whether the current number of records is 32 Multiple , That is, record small batches spill; Check whether the current memory exceeds the memory threshold 
  if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
    //  from shuffle Get the current memory from the memory pool 2 times 
    val amountToRequest = 2 * currentMemory - myMemoryThreshold
    val granted =   // Actually, first apply for memory , Then judge again , Finally decide whether to spill
      taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
    myMemoryThreshold += granted
    //  If memory is too small to grow further or exceed memoryThreshold threshold , It will overflow 
    shouldSpill = currentMemory >= myMemoryThreshold
  }
  shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
  // Actually spill
  if (shouldSpill) {
    _spillCount += 1
    logSpillage(currentMemory)
    spill(collection)   //ExternalSorter Of spill Method rewriting Spillable Of spill Method 
    _elementsRead = 0
    _memoryBytesSpilled += currentMemory
    releaseMemory()
  }
  shouldSpill
}

Tungsten-sort:UnsafeShuffleWriter analysis

be based on TungstenSort Of shuffle The implementation mechanism corresponds to the serialization sorting mode (SerializedShuffleHandle). Using serialization UnsafeShuffleWriter Class conditions :

  1. Shuffle Process without aggregation or output without sorting
  2. Shuffle The serializer of supports serialization value relocation
  3. Shuffle The number of output partitions in the process is less than 16777216 individual

With SortShuffleManager Of getWriter The method is known , Data writer class UnsafeShuffleWriter Using variables in shuffleBlockResolver To parse the mapping between logical data block and physical data block , This variable uses the same as Hash Of Shuffle Different parsing classes with different implementation mechanisms IndexShuffleBlockResolver.UnsafeShuffleWriter Of write Method source code :

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {......
    while (records.hasNext()) { // For the input recordset records, Loop to insert each record into the external sorter 
      insertRecordIntoSorter(records.next());
    } // Generate two types of data : Every M The scheduling end generates a data file and a corresponding index file 
    closeAndWriteOutput(); 
    success = true;
  } finally {
    if (sorter != null) {
      try {
        sorter.cleanupResources();  // Release the resources of the external sorter 
......}

UnsafeShuffleWriter Of insertRecordIntoSorter Method source code :

void insertRecordIntoSorter(Product2<K, V> record) throws IOException {......
  final K key = record._1();
  final int partitionId = partitioner.getPartition(key);
  serBuffer.reset();// Reset the buffer where records are stored , Use ByteArrayOutputStream Storage records , Capacity of 1MB
// Further use the serializer from serBuffer Buffer build serialized output stream , Write record to buffer 
  serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
  serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
  serOutputStream.flush();
  final int serializedRecordSize = serBuffer.size();
  assert (serializedRecordSize > 0);
// Insert records into an external sorter ,serBuffer It's an array of bytes , The internal data storage offset is Platform.BYTE_ARRAY_OFFSET
  sorter.insertRecord(
    serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}

Keep looking at UnsafeShuffleWriter Of write Methods closeAndWriteOutput Method source code :

void closeAndWriteOutput() throws IOException {assert(sorter != null);
  updatePeakMemoryUsed();
  serBuffer = null;  // Set to null, be used for GC Garbage collection 
  serOutputStream = null;
  final SpillInfo[] spills = sorter.closeAndGetSpills(); // Close the external sorter , And get all spill Information 
  sorter = null;
  final long[] partitionLengths;   //getDataFile Get the output file name through the block parser 
  final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
  final File tmp = Utils.tempFileWith(output);  // stay writeIndexFileAndCommit Call the block parser repeatedly to get the file name 
  try {
    partitionLengths = mergeSpills(spills, tmp);
  } finally {......
  } // Will merge spill The obtained partition and quantity information is written to the index file , And rename the temporary file to the real data file name 
  shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
  mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

UnsafeShuffleWriter Of closeAndWriteOutput There are three main steps :

  1. Start external sorter , obtain spill Information ;
  2. Merge intermediate spill file , Generate a data file and return the data information corresponding to each partition
  3. The index file corresponding to the data file is generated according to the data information of each partition

UnsafeShuffleWriter Of closeAndWriteOutput Method is called mergeSpills Method , be UnsafeShuffleWriter Of mergeSpills Method source code :

private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
  final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); // Compress configuration information 
  final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
  final boolean fastMergeEnabled =  // Whether to start unsafe Quick merge of 
    sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
  final boolean fastMergeIsSupported = !compressionEnabled || // When there is no compression or compression supports serialization , Fast merge 
    CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
  try {
    if (spills.length == 0) {
      new FileOutputStream(outputFile).close(); //  No, spill Intermediate file , Create an empty file 
      return new long[partitioner.numPartitions()];
    } else if (spills.length == 1) {
      //  the last one spills The file has been updated metrics Information , There is no need to update repeatedly , Rename directly spills The intermediate temporary file of is the target output data file 
      Files.move(spills[0].file, outputFile);
      return spills[0].partitionLengths;
    } else {
      final long[] partitionLengths;
      //  Multiple gears spill Intermediate temporary files , Adopt different file merging strategies according to different conditions 
      if (fastMergeEnabled && fastMergeIsSupported) {
        if (transferToEnabled) {
          //  adopt NIO Ways to merge spills The data of , Only in I/O Compression codes and sequencers are only safe if they support the merging of serialization streams 
          partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
        } else {
          //  Use java FileStreams File stream merge 
          partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
        }
      } else {
        partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
      }
      //  to update shuffle Write the measurement information of the data 
      writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
      writeMetrics.incShuffleBytesWritten(outputFile.length());
      return partitionLengths;
    }
  } ......}
private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
  final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true); // Compress configuration information 
  final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
  final boolean fastMergeEnabled =  // Whether to start unsafe Quick merge of 
    sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
  final boolean fastMergeIsSupported = !compressionEnabled || // When there is no compression or compression supports serialization , Fast merge 
    CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);
  try {
    if (spills.length == 0) {
      new FileOutputStream(outputFile).close(); //  No, spill Intermediate file , Create an empty file 
      return new long[partitioner.numPartitions()];
    } else if (spills.length == 1) {
      //  the last one spills The file has been updated metrics Information , There is no need to update repeatedly , Rename directly spills The intermediate temporary file of is the target output data file 
      Files.move(spills[0].file, outputFile);
      return spills[0].partitionLengths;
    } else {
      final long[] partitionLengths;
      //  Multiple gears spill Intermediate temporary files , Adopt different file merging strategies according to different conditions 
      if (fastMergeEnabled && fastMergeIsSupported) {
        if (transferToEnabled) {
          //  adopt NIO Ways to merge spills The data of , Only in I/O Compression codes and sequencers are only safe if they support the merging of serialization streams 
          partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
        } else {
          //  Use java FileStreams File stream merge 
          partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
        }
      } else {
        partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
      }
      //  to update shuffle Write the measurement information of the data 
      writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
      writeMetrics.incShuffleBytesWritten(outputFile.length());
      return partitionLengths;
    }
  } ......}

 

原网站

版权声明
本文为[ZH519080]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221523255340.html