当前位置:网站首页>spark operator-wholeTextFiles operator

spark operator-wholeTextFiles operator

2022-08-05 06:10:00 zdaiqing

1.简介

wholeTextFiles简介	
	SparkContext.wholeTextFilesAbility to process multiple files at the same time;
	Takes the folder where the multiple files are located as an argument,构建RDD;
	该RDDelements are made for each file (filename, content) 键值对,filename为文件路径,conten为文件内容;
	Often used for building multiple small filesRDD的场景;
	Returns can be specified by parametersRDDthe minimum number of partitions;
wholeTextFiles与textFile对比
	相似
		Both can be built from multiple filesRDD;
	区别
		wholeTextFilesThe argument is the path to the folder where the multiple small files are located,textFile处理多个文件时,The argument is a comma-separated string of multiple file paths;
		wholeTextFilesOften used to build from multiple small filesRDD,textFileOften used to build from a single fileRDD;
		wholeTextFiles将每个文件作为 (filename, content) 键值对返回,textFile将在每个文件中每行返回一条记录;

2.源码阅读

2.1.wholeTextFiles源码

defaultMinPartitions 逻辑参考spark算子-textFile算子

参数说明
	path:The path to the folder where the small file is located
	minPartitions:
		最小分区数,可以在调用wholeTextFilesoperator specified,也可以不指定;
		不指定的情况下,The default minimum number of partitions is defaultMinPartitions:
			 spark.default.parallelismParameters have valuesp:
				当p>2时,defaultMinPartitions=2,即textFile()The operator defaults to a minimum number of partitions2;
				当p<=2时,defaultMinPartitions=p,即textFile()The operator defaults to a minimum number of partitionsp;
			spark.default.parallelismThe parameter has no value:
				集群模式下,defaultMinPartitions=2,即textFile()The operator defaults to a minimum number of partitions2;
				local模式下,defaultMinPartitions=min(cpu总核数,2);
代码内容
	指定RDD分区
	Specifies the read folder path
	构建WholeTextFileRDD
	RDD转换:文件路径为key,文件内容为value
	设置RDD名称
def wholeTextFiles(
      path: String,
      //This parameter specifies the minimum number of partitions
      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
    
    assertNotStopped()
    val job = NewHadoopJob.getInstance(hadoopConfiguration)
    
    //Specifies the read folder path
    NewFileInputFormat.setInputPaths(job, path)
    val updateConf = job.getConfiguration
    //构建WholeTextFileRDD
    new WholeTextFileRDD(
      this,
      classOf[WholeTextFileInputFormat],
      classOf[Text],
      classOf[Text],
      updateConf,
      minPartitions
      ).map(
      	//RDD转换:文件路径为key,文件内容为value
      	record => (record._1.toString, record._2.toString)
      ).setName(path)
  }

2.2.WholeTextFileRDDMedium data partition source code

总结
	The number of file fragments is determinedRDD分区数
分区步骤
	Set according to the minimum number of partitionsmaxSplitSize(最大分片大小)、minSplitSizeRack(Rack minimum shard size)、minSplitSizeNode(Node minimum shard size)
	文件切片
	Build partitions from file slices
override def getPartitions: Array[Partition] = {
    
    val conf = getConf
    
    conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
      Runtime.getRuntime.availableProcessors().toString)
    val inputFormat = inputFormatClass.newInstance
    inputFormat match {
    
      case configurable: Configurable =>
        configurable.setConf(conf)
      case _ =>
    }
    val jobContext = new JobContextImpl(conf, jobId)
    //设置maxSplitSize(最大分片大小)、minSplitSizeRack(Rack minimum shard size)、minSplitSizeNode(Node minimum shard size)
    inputFormat.setMinPartitions(jobContext, minPartitions)
    //文件切片
    val rawSplits = inputFormat.getSplits(jobContext).toArray
    //The number of file slices is set to RDD分区数
    val result = new Array[Partition](rawSplits.size)
    //Build from file slicesRDD分区
    for (i <- 0 until rawSplits.size) {
    
      result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
    }
    result
  }

2.3.setMinPartitions 设置切片大小

The maximum number of bytes for a slicemaxSplitSize:
	比 总字节数/最小分区数 大的最小整数值
每个节点/Rack's slice minimum bytes cannot be greater thanmaxSplitSize
def setMinPartitions(context: JobContext, minPartitions: Int) {
    
    val files = listStatus(context).asScala
    //The total number of bytes for small files
    val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
    //The maximum number of bytes for a slice:比 总字节数/最小分区数 大的最小整数值
    val maxSplitSize = Math.ceil(totalLen * 1.0 /
      (if (minPartitions == 0) 1 else minPartitions)).toLong

    val config = context.getConfiguration
    //The minimum number of bytes for node slices set in the configuration file
    val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
    //The minimum number of bytes for rack slices set in the configuration file
    val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)

	//确保每个节点/Rack's slice minimum bytes cannot be greater thanmaxSplitSize
    if (maxSplitSize < minSplitSizePerNode) {
    
      super.setMinSplitSizeNode(maxSplitSize)
    }
    if (maxSplitSize < minSplitSizePerRack) {
    
      super.setMinSplitSizeRack(maxSplitSize)
    }
    super.setMaxSplitSize(maxSplitSize)
  }

3.CombineFileInputFormat.getSplits() 文件切片

内容
	Get block constraints:minSizeNode、minSizeRack、maxSize
	获取文件信息:数量、位置等
	Call the specific file slice method
public List<InputSplit> getSplits(JobContext job) throws IOException {
    
        long minSizeNode = 0L;
        long minSizeRack = 0L;
        long maxSize = 0L;
        Configuration conf = job.getConfiguration();
        //获取minSizeNode、minSizeRack、maxSize;这些值在setMinPartitionsIt was set when setting the slice size,If there is no setting, it is obtained from the configuration file,The configuration file is set without configuration0;
        if (this.minSplitSizeNode != 0L) {
    
            minSizeNode = this.minSplitSizeNode;
        } else {
    
            minSizeNode = conf.getLong("mapreduce.input.fileinputformat.split.minsize.per.node", 0L);
        }

        if (this.minSplitSizeRack != 0L) {
    
            minSizeRack = this.minSplitSizeRack;
        } else {
    
            minSizeRack = conf.getLong("mapreduce.input.fileinputformat.split.minsize.per.rack", 0L);
        }

        if (this.maxSplitSize != 0L) {
    
            maxSize = this.maxSplitSize;
        } else {
    
            maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0L);
        }

		//确保minSizeNode、minSizeRack不大于maxSize, minSizeNode不大于minSizeRack
        if (minSizeNode != 0L && maxSize != 0L && minSizeNode > maxSize) {
    
            throw new IOException("Minimum split size pernode " + minSizeNode + " cannot be larger than maximum split size " + maxSize);
        } else if (minSizeRack != 0L && maxSize != 0L && minSizeRack > maxSize) {
    
            throw new IOException("Minimum split size per rack " + minSizeRack + " cannot be larger than maximum split size " + maxSize);
        } else if (minSizeRack != 0L && minSizeNode > minSizeRack) {
    
            throw new IOException("Minimum split size per node " + minSizeNode + " cannot be larger than minimum split " + "size per rack " + minSizeRack);
        } else {
    
        	//Get a list of file attributes
            List<FileStatus> stats = this.listStatus(job);
            //File slice storage container
            List<InputSplit> splits = new ArrayList();
            //Empty folders return empty containers directly
            if (stats.size() == 0) {
    
                return splits;
            } else {
    
                Iterator var11 = this.pools.iterator();

                while(var11.hasNext()) {
    
                    CombineFileInputFormat.MultiPathFilter onepool = (CombineFileInputFormat.MultiPathFilter)var11.next();
                    ArrayList<FileStatus> myPaths = new ArrayList();
                    Iterator iter = stats.iterator();

                    while(iter.hasNext()) {
    
                        FileStatus p = (FileStatus)iter.next();
                        if (onepool.accept(p.getPath())) {
    
                            myPaths.add(p);
                            iter.remove();
                        }
                    }

                    this.getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
                }

				//文件切分
                this.getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
                this.rackToNodes.clear();
                return splits;
            }
        }
    }

3.1.getMoreSplits The specific implementation of file slicing

File slicing steps
	A single file is summarized in file chunks
	Slices according to the number of file chunks and shards
private void getMoreSplits(JobContext job, List<FileStatus> stats, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) throws IOException {
    
        Configuration conf = job.getConfiguration();
        //描述blockwith rack、节点之间的对应关系
        //机架->blocks
        HashMap<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks = new HashMap();
        //block -> nodes
        HashMap<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes = new HashMap();
        //节点node -> blocks
        HashMap<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks = new HashMap();
        //An array of individual file information
        CombineFileInputFormat.OneFileInfo[] files = new CombineFileInputFormat.OneFileInfo[stats.size()];
        if (stats.size() != 0) {
    
            long totLength = 0L;
            int i = 0;

			//Polls each file,And count the total file bytes
            for(Iterator var18 = stats.iterator(); var18.hasNext(); totLength += files[i].getLength()) {
    
                FileStatus stat = (FileStatus)var18.next();
                //File chunking for a single file
                files[i] = new CombineFileInputFormat.OneFileInfo(stat, conf, this.isSplitable(job, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks, this.rackToNodes, maxSize);
            }

			//Data slicing is performed based on file chunks and the number of shards
            this.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, maxSize, minSizeNode, minSizeRack, splits);
        }
    }

3.1.1.File chunking logic for individual files

The dicing size determines the rules
	The dicing size parameter is not set
		The file size is the chunk size
	The dicing size parameter has been set
		文件大小 < Tile size parameter,The file size is the chunk size
		Tile size parameter < 文件大小 < Tile size parameter * 2 ,文件大小的1/2 is the slice size
		文件大小 > 2 * Tile size parameter,The slice size parameter is the slice size
总结
	This code is to build a single file information classOneFileInfo的对象的过程,Divide the file into blocks to buildblock对象,完善OneFileInfo对象的fileSize、blocks属性;
OneFileInfo(FileStatus stat, Configuration conf, boolean isSplitable, HashMap<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks, HashMap<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes, HashMap<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks, HashMap<String, Set<String>> rackToNodes, long maxSize) throws IOException {
    
			//获取文件信息
            BlockLocation[] locations;
            if (stat instanceof LocatedFileStatus) {
    //本地文件
                locations = ((LocatedFileStatus)stat).getBlockLocations();
            } else {
    
                FileSystem fs = stat.getPath().getFileSystem(conf);
                locations = fs.getFileBlockLocations(stat, 0L, stat.getLen());
            }

			//空文件,Put back an array of empty file blocks
            if (locations == null) {
    
                this.blocks = new CombineFileInputFormat.OneBlockInfo[0];
            } else {
    
                if (locations.length == 0 && !stat.isDirectory()) {
    
                    locations = new BlockLocation[]{
    new BlockLocation()};
                }
				
				//There is no need for segmentation:A file builds a file blockblock
                if (!isSplitable) {
    
                    this.blocks = new CombineFileInputFormat.OneBlockInfo[1];
                    this.fileSize = stat.getLen();
                    this.blocks[0] = new CombineFileInputFormat.OneBlockInfo(stat.getPath(), 0L, this.fileSize, locations[0].getHosts(), locations[0].getTopologyPaths());
                } else {
    
                	//存放文件块block的容器
                    ArrayList<CombineFileInputFormat.OneBlockInfo> blocksList = new ArrayList(locations.length);
                    int i = 0;

                    while(true) {
    
                    	//After the file is chunked,将blocksThe container is converted from a list to an array
                        if (i >= locations.length) {
    
                            this.blocks = (CombineFileInputFormat.OneBlockInfo[])blocksList.toArray(new CombineFileInputFormat.OneBlockInfo[blocksList.size()]);
                            break;
                        }

						//文件大小
                        this.fileSize += locations[i].getLength();
                        //The size of the data to be sliced
                        long left = locations[i].getLength();
                        //The starting offset of the data to be sliced
                        long myOffset = locations[i].getOffset();
                        //Initialize file block size
                        long myLength = 0L;

                        do {
    
                        	//Determines the file chunk size
                        	//The dicing size parameter is not set,The file size is the file chunk size
                            if (maxSize == 0L) {
    
                                myLength = left;
                            } 
                            //The file size is set in the slice size parameter1~2倍间,The file chunk size is the size of the file1/2
                            else if (left > maxSize && left < 2L * maxSize) {
    
                                myLength = left / 2L;
                            } 
                            //The file size is larger than the set tile size parameter2倍的,The set slice size parameter is the file slice size
                            //The file size is smaller than that set the dicing size parameter,The file size is the file chunk size
                            else {
    
                                myLength = Math.min(maxSize, left);
                            }

							//Divide the data according to the determined file size,Divide the file into chunks
                            CombineFileInputFormat.OneBlockInfo oneblock = new CombineFileInputFormat.OneBlockInfo(stat.getPath(), myOffset, myLength, locations[i].getHosts(), locations[i].getTopologyPaths());
                            //Totally removes sliced ​​data from the data to be sliced
                            left -= myLength;
                            //Update the actual offset of the data to be sliced
                            myOffset += myLength;
                            //将block添加到block容器中
                            blocksList.add(oneblock);
                        } while(left > 0L);

                        ++i;
                    }
                }

				//并完善block、节点、Mapping relationship between racks
                populateBlockInfo(this.blocks, rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
            }

        }

3.1.1.1OneFileInfo Single file information class

fileSize:文件字节数
blocks:文件块block的数组
static class OneFileInfo {
    
        private long fileSize = 0L;
        private CombineFileInputFormat.OneBlockInfo[] blocks;
    }

3.1.1.2.OneBlockInfo 单block信息类

static class OneBlockInfo {
    
        Path onepath;//文件位置
        long offset;//block起始位置偏移量
        long length;//block字节数
        String[] hosts;//block所在节点
        String[] racks;//blockthe rack
	}

3.1.1.3.完善block、node、rack对应关系

static void populateBlockInfo(CombineFileInputFormat.OneBlockInfo[] blocks, Map<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks, Map<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes, Map<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks, Map<String, Set<String>> rackToNodes) {
    
            CombineFileInputFormat.OneBlockInfo[] var5 = blocks;
            int var6 = blocks.length;
			
			//轮询blocks数组
            for(int var7 = 0; var7 < var6; ++var7) {
    
                CombineFileInputFormat.OneBlockInfo oneblock = var5[var7];
				
				//完善block->节点
                blockToNodes.put(oneblock, oneblock.hosts);
                String[] racks = null;
                if (oneblock.hosts.length == 0) {
    
                    racks = new String[]{
    "/default-rack"};
                } else {
    
                    racks = oneblock.racks;
                }

                int j;
                String node;
                Object blklist;
                //Complete the rack->blocks
                for(j = 0; j < racks.length; ++j) {
    
                    node = racks[j];
                    blklist = (List)rackToBlocks.get(node);
                    if (blklist == null) {
    
                        blklist = new ArrayList();
                        rackToBlocks.put(node, blklist);
                    }

                    ((List)blklist).add(oneblock);
                    if (!racks[j].equals("/default-rack")) {
    
                        CombineFileInputFormat.addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
                    }
                }

				//Complete the node->blocks
                for(j = 0; j < oneblock.hosts.length; ++j) {
    
                    node = oneblock.hosts[j];
                    blklist = (Set)nodeToBlocks.get(node);
                    if (blklist == null) {
    
                        blklist = new LinkedHashSet();
                        nodeToBlocks.put(node, blklist);
                    }

                    ((Set)blklist).add(oneblock);
                }
            }

        }

3.1.2.Data slicing logic for all files

分片逻辑
	1、Poll each node,对每个节点的blockList for data sharding
		A1、The current node is pendingblockThe total number of bytes in the collection  > maxSize(分片参数)时,进行数据分片
		B1、A1不满足时,当 minSizeNode < The current node is pendingblockThe total number of bytes in the collection < maxSize,进行数据分片
		C1、A1和B1不满足时,will be pendingblock表示为未处理,等待后续处理
	2、进过步骤1处理后,Still unprocessedblock时,Poll each rack,Not processed for each rackblockList for data sharding
		A2、The current rack is not processedblock总字节数 > maxSize时,进行数据分片
		B2、A2不满足时,当 minSizeRack < The current rack is not processedblock总字节数 < maxSize,进行数据分片
		C2、A2和B2不满足时,will not be processedblockPut in a dedicated container,留待后续处理;
	3、经过步骤1和步骤2处理后,Still unprocessedblockWhen stored in a dedicated container,in a dedicated containerblockList for data sharding
		A3、Polling remainingblock列表,when accumulatingblock字节总数 >= maxSize,进行数据分片
		B3、当A3不满足时,to all remainingblockPerform a data fragmentation
void createSplits(Map<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks, Map<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes, Map<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks, long totLength, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) {
    
		//待处理block列表
        ArrayList<CombineFileInputFormat.OneBlockInfo> validBlocks = new ArrayList();
        //待处理block字节数
        long curSplitSize = 0L;
        //节点数
        int totalNodes = nodeToBlocks.size();
        //The total number of bytes required to slice
        long totalLength = totLength;
        //A collection of data slices for each node
        Multiset<String> splitsPerNode = HashMultiset.create();
        //已处理节点集合
        HashSet completedNodes = new HashSet();

        label170:
        //Polling for each node data
        do {
    
        	//node -> blocks 迭代器
            Iterator iter = nodeToBlocks.entrySet().iterator();

            while(true) {
    
                while(true) {
    
                    Entry one;
                    String node;
                    do {
    
                    	//数据处理完毕,Jump out of the big poll
                        if (!iter.hasNext()) {
    
                            continue label170;
                        }
						//The local loop needs to be handlednode->blocks映射
                        one = (Entry)iter.next();
                        //Nodes that need to be processed in this loop
                        node = (String)one.getKey();
                    } while(completedNodes.contains(node));
					//The file block of the current nodeblock集合
                    Set<CombineFileInputFormat.OneBlockInfo> blocksInCurrentNode = (Set)one.getValue();
                    //文件块block集合迭代器
                    Iterator oneBlockIter = blocksInCurrentNode.iterator();

					//Polling processes the current node allblock
                    while(oneBlockIter.hasNext()) {
    
                    	//单个block
                        CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)oneBlockIter.next();
                        if (!blockToNodes.containsKey(oneblock)) {
    
                        	//Removal processedblock
                            oneBlockIter.remove();
                        } else {
    
                        	//将当前blockAdd to pending list
                            validBlocks.add(oneblock);
                            //标识当前block已处理
                            blockToNodes.remove(oneblock);
                            //Calculation is pendingblockThe total number of bytes in the list
                            curSplitSize += oneblock.length;
                            //The current node is pendingblockThe total number of bytes in the collection > maxSize(分片参数)时,进行数据分片
                            if (maxSize != 0L && curSplitSize >= maxSize) {
    
                            	//数据分片
                                this.addCreatedSplit(splits, Collections.singleton(node), validBlocks);
                                //Calculate the total number of bytes of data remaining to be fragmented
                                totalLength -= curSplitSize;
                                //初始化待处理block字节数
                                curSplitSize = 0L;
                                splitsPerNode.add(node);
                                //Update the set of pending file blocks for the current node
                                blocksInCurrentNode.removeAll(validBlocks);
                                //Empty pendingblock集合
                                validBlocks.clear();
                                break;
                            }
                        }
                    }
					
					//当前节点blockThe data shard for the collection is not over 
                    if (validBlocks.size() != 0) {
    
                    	//minSizeNode < The current node is pendingblockThe total number of bytes in the collection < maxSize
                        if (minSizeNode != 0L && curSplitSize >= minSizeNode && splitsPerNode.count(node) == 0) {
    
                        	//继续分片
                            this.addCreatedSplit(splits, Collections.singleton(node), validBlocks);
                            totalLength -= curSplitSize;
                            splitsPerNode.add(node);
                            blocksInCurrentNode.removeAll(validBlocks);
                        } else {
    //minSizeNode > The current node is pendingblockThe total number of bytes in the collection
                            Iterator var36 = validBlocks.iterator();

							//The current node is all pendingblocksIdentified as unprocessed,Subsequent fragmentation processing is performed according to the identification judgment
                            while(var36.hasNext()) {
    
                                CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)var36.next();
                                blockToNodes.put(oneblock, oneblock.hosts);
                            }
                        }

                        validBlocks.clear();
                        curSplitSize = 0L;
                        completedNodes.add(node);
                    } else if (blocksInCurrentNode.size() == 0) {
    //当前节点blockThe data of the collection is all fragmented
                        completedNodes.add(node);
                    }
                }
            }
        } while(completedNodes.size() != totalNodes && totalLength != 0L);

        LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: " + completedNodes.size() + ", size left: " + totalLength);
        ArrayList overflowBlocks = new ArrayList();
        HashSet racks = new HashSet();

        Iterator iter;
        label130:
        //After node polling is processed,left unprocessedblock数据的分片
        while(blockToNodes.size() > 0) {
    
            iter = rackToBlocks.entrySet().iterator();
			//Poll each rack
            while(true) {
    
                while(true) {
    
                    if (!iter.hasNext()) {
    
                        continue label130;
                    }

                    Entry<String, List<CombineFileInputFormat.OneBlockInfo>> one = (Entry)iter.next();
                    racks.add(one.getKey());
                    //All of the current rackblock列表
                    List<CombineFileInputFormat.OneBlockInfo> blocks = (List)one.getValue();
                    boolean createdSplit = false;
                    Iterator var38 = blocks.iterator();
					//Poll rack allblock
                    while(var38.hasNext()) {
    
                        CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)var38.next();
                        //Filtering the current rack is not processedblock
                        if (blockToNodes.containsKey(oneblock)) {
    
                            validBlocks.add(oneblock);
                            blockToNodes.remove(oneblock);
                            curSplitSize += oneblock.length;
                            //The current rack is not processedblock总字节数 > maxSize
                            if (maxSize != 0L && curSplitSize >= maxSize) {
    
                            	//数据分片
                                this.addCreatedSplit(splits, this.getHosts(racks), validBlocks);
                                //Shard success flag
                                createdSplit = true;
                                break;
                            }
                        }
                    }

                    if (createdSplit) {
    
                        curSplitSize = 0L;
                        validBlocks.clear();
                        racks.clear();
                    } else {
    
                    	//The current rack is not processedblock总字节数 < maxSize
                        if (!validBlocks.isEmpty()) {
    
                        	//minSizeRack < The current rack is not processedblock总字节数 < maxSize
                            if (minSizeRack != 0L && curSplitSize >= minSizeRack) {
    
                            	//数据分片
                                this.addCreatedSplit(splits, this.getHosts(racks), validBlocks);
                            } else {
    //minSizeRack > The current rack is not processedblock总字节数
                            	//The current rack is not processedblockFragmentation conditions are not met,后续处理
                                overflowBlocks.addAll(validBlocks);
                            }
                        }

                        curSplitSize = 0L;
                        validBlocks.clear();
                        racks.clear();
                    }
                }
            }
        }

        assert blockToNodes.isEmpty();

        assert curSplitSize == 0L;

        assert validBlocks.isEmpty();

        assert racks.isEmpty();
		//节点、After rack polling for shards,Left unshardedblocks列表迭代器
        iter = overflowBlocks.iterator();
		//Polling remainingblock
        while(iter.hasNext()) {
    
            CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)iter.next();
            validBlocks.add(oneblock);
            curSplitSize += oneblock.length;

            for(int i = 0; i < oneblock.racks.length; ++i) {
    
                racks.add(oneblock.racks[i]);
            }
			//剩余block字节总数 >= maxSize
            if (maxSize != 0L && curSplitSize >= maxSize) {
    
            	//数据分片
                this.addCreatedSplit(splits, this.getHosts(racks), validBlocks);
                curSplitSize = 0L;
                validBlocks.clear();
                racks.clear();
            }
        }
		
		//最后,将剩余block进行数据分片;这部分blockThe total number of list bytes < maxSize
        if (!validBlocks.isEmpty()) {
    
            this.addCreatedSplit(splits, this.getHosts(racks), validBlocks);
        }

    }

3.1.2.1.addCreatedSplit Build data shards

private void addCreatedSplit(List<InputSplit> splitList, Collection<String> locations, ArrayList<CombineFileInputFormat.OneBlockInfo> validBlocks) {
    
        Path[] fl = new Path[validBlocks.size()];
        long[] offset = new long[validBlocks.size()];
        long[] length = new long[validBlocks.size()];
		//Polling pendingblock
        for(int i = 0; i < validBlocks.size(); ++i) {
    
            fl[i] = ((CombineFileInputFormat.OneBlockInfo)validBlocks.get(i)).onepath;
            offset[i] = ((CombineFileInputFormat.OneBlockInfo)validBlocks.get(i)).offset;
            length[i] = ((CombineFileInputFormat.OneBlockInfo)validBlocks.get(i)).length;
        }
		//Build a data slice object
        CombineFileSplit thissplit = new CombineFileSplit(fl, offset, length, (String[])locations.toArray(new String[0]));
        //List of data slices
        splitList.add(thissplit);
    }

3.1.2.2.Slice information class

A data slice contains 0~n个文件块
public class CombineFileSplit extends InputSplit implements Writable {
    
    private Path[] paths;		//数据库路径
    private long[] startoffset;	//Data block start offset
    private long[] lengths;		//数据块字节数
    private String[] locations;	//The node where the data block is located
    private long totLength;		//The number of bytes in the data slice
   
   	private void initSplit(Path[] files, long[] start, long[] lengths, String[] locations) {
    
        this.startoffset = start;
        this.lengths = lengths;
        this.paths = files;
        this.totLength = 0L;
        this.locations = locations;
        long[] var5 = lengths;
        int var6 = lengths.length;

        for(int var7 = 0; var7 < var6; ++var7) {
    
            long length = var5[var7];
            this.totLength += length;
        }

    }
}

4.参考资料

RDD编程指南
十九、CombineTextInputFormat切片机制源码分析

原网站

版权声明
本文为[zdaiqing]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/217/202208050514371633.html