当前位置:网站首页>spark operator-textFile operator
spark operator-textFile operator
2022-08-05 06:11:00 【zdaiqing】
textFile算子
1.textFile算子源码
1.1.源码
//org.apache.spark.SparkContext
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
1.2.textFile算子参数
path:文本所在路径;HDFS、本地文件系统(在所有节点上可用)或任何hadoop支持的文件系统;
minPartitions :最小分区;默认值根据defaultMinPartitions决定;
1.3.operator logic
调用hadoopFile算子生成fileRDD;
将文件路径作为rdd名称;
2.defaultMinPartitions参数
2.1.defaultMinPartitions源码
org.apache.spark.SparkContext
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
2.1.2.operator logic
在defaultParallelism参数值(Default partition parameters)和2之间取最小值;
2.2.defaultParallelism算子
2.2.1.源码
//根据taskScheduler.defaultParallelism确定
//org.apache.spark.SparkContext
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
//org.apache.spark.scheduler.TaskSchedulerImpl
override def defaultParallelism(): Int = backend.defaultParallelism()
//全局配置的spark.default.parallelismIf the parameter has a value, take the parameter value
//Based on no valuecpu总核数 vs 2 Take a larger value for confirmation
//org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
//全局配置的spark.default.parallelismIf the parameter has a value, take the parameter value
//Based on no valuecpu总核数 The value is determined
//org.apache.spark.scheduler.local.LocalSchedulerBackend
override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
2.2.2.operator logic
taskShedulerThe default partition value of the set points2种:
1. 全局配置的spark.default.parallelism参数有值,Take the parameter value
2. 全局配置的spark.default.parallelism参数没有设置
集群模式下,在cpu总核数 vs 2之间取最大值
local模式下,取cpu总核数;
2.3.defaultMinPartitionsParametric logic
spark.default.parallelism参数有值为p:
当p>2时,defaultMinPartitions=2,即textFile()算子默认最小分区数为2;
当p<=2时,defaultMinPartitions=p,即textFile()算子默认最小分区数为p;
spark.default.parallelism参数无值:
集群模式下,defaultMinPartitions=2,即textFile()算子默认最小分区数为2;
local模式下,defaultMinPartitions=min(cpu总核数,2);
3.hadoopFile算子
3.1.源码
//org.apache.spark.SparkContext
/** 1. path: directory to the input data file,Paths can take a comma-separated list of paths as input 2. InputFormatClass: TextInputFormat; The storage format of the data to read 3. keyClass: LongWritable; 与inputFormatClassThe class of the key associated with the parameter 4. valueClass: Text; 与inputFormatClassThe class of the value associated with the parameter */
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// 强制加载hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// 广播hadoop配置文件
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
//设置文件读取路径:The path parameter supports a comma-separated batch of paths,That is, a batch of files can be read at the same time
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
4.RDD分区规则
4.1.源码
//org.apache.spark.rdd.HadoopRDD
/** * 返回分区列表 */
override def getPartitions: Array[Partition] = {
//获取配置信息
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
try {
//Get an array of file shards
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
//Handle empty shards
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
//Create a list of partitions based on the number of shards
val array = new Array[Partition](inputSplits.size)
//Store shard information in the partition list:rddId,分区编号,文件分片信息
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
} catch {
case e: InvalidInputException if ignoreMissingFiles =>
logWarning(s"${
jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
s" partitions returned from this path.", e)
Array.empty[Partition]
}
}
4.2. operator logic
Divide the file according to certain rules and return an array of file shards
File slice array size asRDD分区数
Save file fragmentation informationRDDin the partition list
5.File slicing rules
5.1.源码
//org.apache.hadoop.mapred.FileInputFormat
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
StopWatch sw = (new StopWatch()).start();
//Get an array of file information based on configuration
FileStatus[] files = this.listStatus(job);
//Set the number of files to read
job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length);
//The total size of the initialization file in bytes is 0
long totalSize = 0L;
FileStatus[] var7 = files;
int var8 = files.length;
//Accumulates the number of file bytes in the file information array as the total number of bytes
for(int var9 = 0; var9 < var8; ++var9) {
FileStatus file = var7[var9];
if (file.isDirectory()) {
throw new IOException("Not a file: " + file.getPath());
}
totalSize += file.getLen();
}
//计算期望分片大小;The minimum number of shards is 1
long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
//最小分片大小:There are parameters to take the competition,No parameter defaults1;
long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
//Build a list of shards based on the number of shards
ArrayList<FileSplit> splits = new ArrayList(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
FileStatus[] var13 = files;
int var14 = files.length;
for(int var15 = 0; var15 < var14; ++var15) {
FileStatus file = var13[var15];
Path path = file.getPath();
long length = file.getLen();
if (length == 0L) {
//Empty files have a single shard
splits.add(this.makeSplit(path, 0L, length, new String[0]));
} else {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus)file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0L, length);
}
//Determines whether the file can be sliced
if (!this.isSplitable(fs, path)) {
String[][] splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap);
//Non-slicable files have a single shard
splits.add(this.makeSplit(path, 0L, length, splitHosts[0], splitHosts[1]));
} else {
//获取block默认大小:128M
long blockSize = file.getBlockSize();
//计算分片大小:Expect size andblockSmall in size,Then take the larger of the minimum size
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining;
String[][] splitHosts;
//文件切分:When the remaining bytes are less than the fragment size1.1倍时,No more splitting
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
}
//The remaining bytes are left in a single fragment
if (bytesRemaining != 0L) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
}
}
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
}
5.2. operator logic
可以通过mapreduce.input.fileinputformat.split.minsizeThe parameter sets the minimum size of the file fragmentsize
When the file is split,Some shards existsizegreater than the shard settingsize,But smaller than the shard setting size1.1倍
There are cases where the number of file slices is not equal to the minimum number of partitions:
Shard size expectationR = 文件大小/最小分区数;
假设【 文件大小/最小分区数 】不能整除,余数 + R > R* 1.1,The actual number of shards in this case = 最小分区数 + 1;
文件大小totalSize = 1250Mb,最小分区数numSplits = 10;
Expected partition sizegoalSize= totalSize / numSplits = 1250Mb / 10 = 123 余 20
根据分片sizedecision principle,Shard size expectation = 123 < block size 128Mb,分区size = 123MB
此时(123 + 20 = 143) > (123 * 1.1 =135.3);此时,The number of file slices = 11 > 最小分区数
分片sizedecision principle:
Shard size expectationssize和block sizetake small,Then and MinsizeTake the big one;
Since the smallestsizeGenerally set to small,所以分片sizeBasically expectingsize和block sizeTake the smaller one;
可以通过mapreduce.input.fileinputformat.split.minsizeThe parameter sets the minimum size of the file fragmentsize
6.总结
textFile()The operator can process a batch of files at a time,pathArguments are separated by commas for unreadable file paths;
由defaultParallelism参数值、cpu总核数、2Determine the minimum number of partitions;
by the minimum number of partitions、The number of file shards determines the returnRDD实际分区数;There is an actual number of partitions greater than the minimum number of partitions1的情况;
When the file is split,The shard size can be the regular shard size1.1倍;
7.参考文件
边栏推荐
猜你喜欢
随机推荐
spark源码-任务提交流程之-1-sparkSubmit
无影云桌面
spark算子-parallelize算子
【Day8】使用LVM扩容所涉及的命令
spark源码-任务提交流程之-5-CoarseGrainedExecutorBackend
【Day8】(超详细步骤)使用LVM扩容
Getting Started 11 Automatically add version numbers
Getting Started Documentation 10 Resource Mapping
你要找的cocos面试答案都在这里了!
Getting Started Document 01 series in order
TCP/IP四层模型
D39_向量
One-arm routing and 30% switch
【Day8】磁盘及磁盘的分区有关知识
spark source code - task submission process - 2-YarnClusterApplication
快问快答—腾讯云服务器常见问题解答
[Pytorch study notes] 8. How to use WeightedRandomSampler (weight sampler) when the training category is unbalanced data
VRRP principle and command
洞察互联网大趋势,读完这篇文章你就彻底了解中文域名
[Paper Intensive Reading] The relationship between Precision-Recall and ROC curves