当前位置:网站首页>spark operator-textFile operator
spark operator-textFile operator
2022-08-05 06:11:00 【zdaiqing】
textFile算子
1.textFile算子源码
1.1.源码
//org.apache.spark.SparkContext
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
1.2.textFile算子参数
path:文本所在路径;HDFS、本地文件系统(在所有节点上可用)或任何hadoop支持的文件系统;
minPartitions :最小分区;默认值根据defaultMinPartitions决定;
1.3.operator logic
调用hadoopFile算子生成fileRDD;
将文件路径作为rdd名称;
2.defaultMinPartitions参数
2.1.defaultMinPartitions源码
org.apache.spark.SparkContext
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
2.1.2.operator logic
在defaultParallelism参数值(Default partition parameters)和2之间取最小值;
2.2.defaultParallelism算子
2.2.1.源码
//根据taskScheduler.defaultParallelism确定
//org.apache.spark.SparkContext
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
//org.apache.spark.scheduler.TaskSchedulerImpl
override def defaultParallelism(): Int = backend.defaultParallelism()
//全局配置的spark.default.parallelismIf the parameter has a value, take the parameter value
//Based on no valuecpu总核数 vs 2 Take a larger value for confirmation
//org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
//全局配置的spark.default.parallelismIf the parameter has a value, take the parameter value
//Based on no valuecpu总核数 The value is determined
//org.apache.spark.scheduler.local.LocalSchedulerBackend
override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
2.2.2.operator logic
taskShedulerThe default partition value of the set points2种:
1. 全局配置的spark.default.parallelism参数有值,Take the parameter value
2. 全局配置的spark.default.parallelism参数没有设置
集群模式下,在cpu总核数 vs 2之间取最大值
local模式下,取cpu总核数;
2.3.defaultMinPartitionsParametric logic
spark.default.parallelism参数有值为p:
当p>2时,defaultMinPartitions=2,即textFile()算子默认最小分区数为2;
当p<=2时,defaultMinPartitions=p,即textFile()算子默认最小分区数为p;
spark.default.parallelism参数无值:
集群模式下,defaultMinPartitions=2,即textFile()算子默认最小分区数为2;
local模式下,defaultMinPartitions=min(cpu总核数,2);
3.hadoopFile算子
3.1.源码
//org.apache.spark.SparkContext
/** 1. path: directory to the input data file,Paths can take a comma-separated list of paths as input 2. InputFormatClass: TextInputFormat; The storage format of the data to read 3. keyClass: LongWritable; 与inputFormatClassThe class of the key associated with the parameter 4. valueClass: Text; 与inputFormatClassThe class of the value associated with the parameter */
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// 强制加载hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)
// 广播hadoop配置文件
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
//设置文件读取路径:The path parameter supports a comma-separated batch of paths,That is, a batch of files can be read at the same time
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
4.RDD分区规则
4.1.源码
//org.apache.spark.rdd.HadoopRDD
/** * 返回分区列表 */
override def getPartitions: Array[Partition] = {
//获取配置信息
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
try {
//Get an array of file shards
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
//Handle empty shards
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
//Create a list of partitions based on the number of shards
val array = new Array[Partition](inputSplits.size)
//Store shard information in the partition list:rddId,分区编号,文件分片信息
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
} catch {
case e: InvalidInputException if ignoreMissingFiles =>
logWarning(s"${
jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
s" partitions returned from this path.", e)
Array.empty[Partition]
}
}
4.2. operator logic
Divide the file according to certain rules and return an array of file shards
File slice array size asRDD分区数
Save file fragmentation informationRDDin the partition list
5.File slicing rules
5.1.源码
//org.apache.hadoop.mapred.FileInputFormat
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
StopWatch sw = (new StopWatch()).start();
//Get an array of file information based on configuration
FileStatus[] files = this.listStatus(job);
//Set the number of files to read
job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length);
//The total size of the initialization file in bytes is 0
long totalSize = 0L;
FileStatus[] var7 = files;
int var8 = files.length;
//Accumulates the number of file bytes in the file information array as the total number of bytes
for(int var9 = 0; var9 < var8; ++var9) {
FileStatus file = var7[var9];
if (file.isDirectory()) {
throw new IOException("Not a file: " + file.getPath());
}
totalSize += file.getLen();
}
//计算期望分片大小;The minimum number of shards is 1
long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
//最小分片大小:There are parameters to take the competition,No parameter defaults1;
long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
//Build a list of shards based on the number of shards
ArrayList<FileSplit> splits = new ArrayList(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
FileStatus[] var13 = files;
int var14 = files.length;
for(int var15 = 0; var15 < var14; ++var15) {
FileStatus file = var13[var15];
Path path = file.getPath();
long length = file.getLen();
if (length == 0L) {
//Empty files have a single shard
splits.add(this.makeSplit(path, 0L, length, new String[0]));
} else {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus)file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0L, length);
}
//Determines whether the file can be sliced
if (!this.isSplitable(fs, path)) {
String[][] splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap);
//Non-slicable files have a single shard
splits.add(this.makeSplit(path, 0L, length, splitHosts[0], splitHosts[1]));
} else {
//获取block默认大小:128M
long blockSize = file.getBlockSize();
//计算分片大小:Expect size andblockSmall in size,Then take the larger of the minimum size
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining;
String[][] splitHosts;
//文件切分:When the remaining bytes are less than the fragment size1.1倍时,No more splitting
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
}
//The remaining bytes are left in a single fragment
if (bytesRemaining != 0L) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
}
}
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
}
5.2. operator logic
可以通过mapreduce.input.fileinputformat.split.minsizeThe parameter sets the minimum size of the file fragmentsize
When the file is split,Some shards existsizegreater than the shard settingsize,But smaller than the shard setting size1.1倍
There are cases where the number of file slices is not equal to the minimum number of partitions:
Shard size expectationR = 文件大小/最小分区数;
假设【 文件大小/最小分区数 】不能整除,余数 + R > R* 1.1,The actual number of shards in this case = 最小分区数 + 1;
文件大小totalSize = 1250Mb,最小分区数numSplits = 10;
Expected partition sizegoalSize= totalSize / numSplits = 1250Mb / 10 = 123 余 20
根据分片sizedecision principle,Shard size expectation = 123 < block size 128Mb,分区size = 123MB
此时(123 + 20 = 143) > (123 * 1.1 =135.3);此时,The number of file slices = 11 > 最小分区数
分片sizedecision principle:
Shard size expectationssize和block sizetake small,Then and MinsizeTake the big one;
Since the smallestsizeGenerally set to small,所以分片sizeBasically expectingsize和block sizeTake the smaller one;
可以通过mapreduce.input.fileinputformat.split.minsizeThe parameter sets the minimum size of the file fragmentsize
6.总结
textFile()The operator can process a batch of files at a time,pathArguments are separated by commas for unreadable file paths;
由defaultParallelism参数值、cpu总核数、2Determine the minimum number of partitions;
by the minimum number of partitions、The number of file shards determines the returnRDD实际分区数;There is an actual number of partitions greater than the minimum number of partitions1的情况;
When the file is split,The shard size can be the regular shard size1.1倍;
7.参考文件
边栏推荐
- 线上问题排查流程
- Getting Started 05 Using cb() to indicate that the current task is complete
- LeetCode面试题
- [Paper Intensive Reading] The relationship between Precision-Recall and ROC curves
- 腾讯内部技术:《轩辕传奇》服务器架构演变
- Getting Started Doc 06 Adding files to a stream
- Getting Started Document 07 Staged Output
- 【Day1】VMware软件安装
- 什么是全栈设计师?
- Getting Started 11 Automatically add version numbers
猜你喜欢
随机推荐
Unity常用模块设计 : Unity游戏排行榜的制作与优化
技术分享杂七杂八技术
[Day8] Commands involved in using LVM to expand
云游戏未来展望
入门文档09 独立的watch
dsf5.0 弹框点确定没有返回值的问题
OpenCV3.0 is compatible with VS2010 and VS2013
spark源码-任务提交流程之-7-流程梳理总结
spark算子-textFile算子
Spark source code-task submission process-6.1-sparkContext initialization-create spark driver side execution environment SparkEnv
小度 小度 在呢!
IP数据包格式(ICMP协议与ARP协议)
URP渲染管线实战教程系列 之URP渲染管线实战解密(一)
spark source code - task submission process - 5-CoarseGrainedExecutorBackend
The problem of redirecting to the home page when visiting a new page in dsf5.0
【Day8】Knowledge about disk and disk partition
vim的三种模式
lvm逻辑卷及磁盘配额
腾讯云云函数SCF—入门须知
huatuo 革命性热更新解决方案系列1·1 为什么这么NB?huatuo革命Unity热更新