当前位置:网站首页>spark operator-textFile operator

spark operator-textFile operator

2022-08-05 06:11:00 zdaiqing

1.textFile算子源码

1.1.源码

//org.apache.spark.SparkContext
def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

1.2.textFile算子参数

path:文本所在路径;HDFS、本地文件系统(在所有节点上可用)或任何hadoop支持的文件系统;
minPartitions :最小分区;默认值根据defaultMinPartitions决定;

1.3.operator logic

调用hadoopFile算子生成fileRDD;
将文件路径作为rdd名称;

2.defaultMinPartitions参数

2.1.defaultMinPartitions源码

org.apache.spark.SparkContext
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

2.1.2.operator logic

在defaultParallelism参数值(Default partition parameters)和2之间取最小值;

2.2.defaultParallelism算子

2.2.1.源码

//根据taskScheduler.defaultParallelism确定
//org.apache.spark.SparkContext
def defaultParallelism: Int = {
    
    assertNotStopped()
    taskScheduler.defaultParallelism
  }

//org.apache.spark.scheduler.TaskSchedulerImpl
override def defaultParallelism(): Int = backend.defaultParallelism()

//全局配置的spark.default.parallelismIf the parameter has a value, take the parameter value
//Based on no valuecpu总核数 vs 2 Take a larger value for confirmation
//org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
override def defaultParallelism(): Int = {
    
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

//全局配置的spark.default.parallelismIf the parameter has a value, take the parameter value
//Based on no valuecpu总核数 The value is determined
//org.apache.spark.scheduler.local.LocalSchedulerBackend
override def defaultParallelism(): Int =
    scheduler.conf.getInt("spark.default.parallelism", totalCores)

2.2.2.operator logic

taskShedulerThe default partition value of the set points2种:
 1. 全局配置的spark.default.parallelism参数有值,Take the parameter value
 2. 全局配置的spark.default.parallelism参数没有设置
 		集群模式下,在cpu总核数 vs 2之间取最大值
 		local模式下,取cpu总核数;

2.3.defaultMinPartitionsParametric logic

 spark.default.parallelism参数有值为p:
 		当p>2时,defaultMinPartitions=2,即textFile()算子默认最小分区数为2;
 		当p<=2时,defaultMinPartitions=p,即textFile()算子默认最小分区数为p;
 spark.default.parallelism参数无值:
 		集群模式下,defaultMinPartitions=2,即textFile()算子默认最小分区数为2;
 		local模式下,defaultMinPartitions=min(cpu总核数,2);

3.hadoopFile算子

3.1.源码

//org.apache.spark.SparkContext
/** 1. path: directory to the input data file,Paths can take a comma-separated list of paths as input 2. InputFormatClass: TextInputFormat; The storage format of the data to read 3. keyClass: LongWritable; 与inputFormatClassThe class of the key associated with the parameter 4. valueClass: Text; 与inputFormatClassThe class of the value associated with the parameter */
def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    
    assertNotStopped()

    // 强制加载hdfs-site.xml.
    // See SPARK-11227 for details.
    FileSystem.getLocal(hadoopConfiguration)

    // 广播hadoop配置文件
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    //设置文件读取路径:The path parameter supports a comma-separated batch of paths,That is, a batch of files can be read at the same time
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

4.RDD分区规则

4.1.源码

//org.apache.spark.rdd.HadoopRDD
/** * 返回分区列表 */
override def getPartitions: Array[Partition] = {
    
	//获取配置信息
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    try {
    
      //Get an array of file shards
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
      //Handle empty shards
      val inputSplits = if (ignoreEmptySplits) {
    
        allInputSplits.filter(_.getLength > 0)
      } else {
    
        allInputSplits
      }
      //Create a list of partitions based on the number of shards
      val array = new Array[Partition](inputSplits.size)
	  //Store shard information in the partition list:rddId,分区编号,文件分片信息
      for (i <- 0 until inputSplits.size) {
    
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
    
      case e: InvalidInputException if ignoreMissingFiles =>
        logWarning(s"${
      jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
            s" partitions returned from this path.", e)
        Array.empty[Partition]
    }
  }

4.2. operator logic

Divide the file according to certain rules and return an array of file shards
File slice array size asRDD分区数
Save file fragmentation informationRDDin the partition list

5.File slicing rules

5.1.源码

//org.apache.hadoop.mapred.FileInputFormat
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    
        StopWatch sw = (new StopWatch()).start();
        //Get an array of file information based on configuration
        FileStatus[] files = this.listStatus(job);
        //Set the number of files to read
        job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length);
        //The total size of the initialization file in bytes is 0
        long totalSize = 0L;
        FileStatus[] var7 = files;
        int var8 = files.length;

		//Accumulates the number of file bytes in the file information array as the total number of bytes
        for(int var9 = 0; var9 < var8; ++var9) {
    
            FileStatus file = var7[var9];
            if (file.isDirectory()) {
    
                throw new IOException("Not a file: " + file.getPath());
            }

            totalSize += file.getLen();
        }

		//计算期望分片大小;The minimum number of shards is 1
        long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
        //最小分片大小:There are parameters to take the competition,No parameter defaults1;
        long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
        //Build a list of shards based on the number of shards
        ArrayList<FileSplit> splits = new ArrayList(numSplits);
        NetworkTopology clusterMap = new NetworkTopology();
        FileStatus[] var13 = files;
        int var14 = files.length;

        for(int var15 = 0; var15 < var14; ++var15) {
    
            FileStatus file = var13[var15];
            Path path = file.getPath();
            long length = file.getLen();
            if (length == 0L) {
    //Empty files have a single shard
                splits.add(this.makeSplit(path, 0L, length, new String[0]));
            } else {
    
                FileSystem fs = path.getFileSystem(job);
                BlockLocation[] blkLocations;
                if (file instanceof LocatedFileStatus) {
    
                    blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                } else {
    
                    blkLocations = fs.getFileBlockLocations(file, 0L, length);
                }

				//Determines whether the file can be sliced
                if (!this.isSplitable(fs, path)) {
    
                    String[][] splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap);
                    //Non-slicable files have a single shard
                    splits.add(this.makeSplit(path, 0L, length, splitHosts[0], splitHosts[1]));
                } else {
    
                	//获取block默认大小:128M
                    long blockSize = file.getBlockSize();
                    //计算分片大小:Expect size andblockSmall in size,Then take the larger of the minimum size
                    long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);

                    long bytesRemaining;
                    String[][] splitHosts;
                    //文件切分:When the remaining bytes are less than the fragment size1.1倍时,No more splitting
                    for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
    
                        splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
                        splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
                    }

					//The remaining bytes are left in a single fragment
                    if (bytesRemaining != 0L) {
    
                        splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
                        splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
                    }
                }
            }
        }

        sw.stop();
        if (LOG.isDebugEnabled()) {
    
            LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }

        return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
    }

5.2. operator logic

可以通过mapreduce.input.fileinputformat.split.minsizeThe parameter sets the minimum size of the file fragmentsize
When the file is split,Some shards existsizegreater than the shard settingsize,But smaller than the shard setting size1.1倍
There are cases where the number of file slices is not equal to the minimum number of partitions:
	Shard size expectationR = 文件大小/最小分区数;
	假设【 文件大小/最小分区数 】不能整除,余数 + R > R* 1.1,The actual number of shards in this case = 最小分区数 + 1;
		文件大小totalSize = 1250Mb,最小分区数numSplits = 10;
		Expected partition sizegoalSize= totalSize / numSplits = 1250Mb / 10 = 123   余  20
		根据分片sizedecision principle,Shard size expectation = 123 < block size 128Mb,分区size = 123MB
		此时(123 + 20 = 143) > (123 * 1.1 =135.3);此时,The number of file slices = 11 > 最小分区数
分片sizedecision principle:
	Shard size expectationssize和block sizetake small,Then and MinsizeTake the big one;
	Since the smallestsizeGenerally set to small,所以分片sizeBasically expectingsize和block sizeTake the smaller one;
可以通过mapreduce.input.fileinputformat.split.minsizeThe parameter sets the minimum size of the file fragmentsize

6.总结

 textFile()The operator can process a batch of files at a time,pathArguments are separated by commas for unreadable file paths;
由defaultParallelism参数值、cpu总核数、2Determine the minimum number of partitions;
by the minimum number of partitions、The number of file shards determines the returnRDD实际分区数;There is an actual number of partitions greater than the minimum number of partitions1的情况;
When the file is split,The shard size can be the regular shard size1.1倍;

7.参考文件

Spark中textFile源码分析

原网站

版权声明
本文为[zdaiqing]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/217/202208050514371846.html