当前位置:网站首页>spark operator-wholeTextFiles operator
spark operator-wholeTextFiles operator
2022-08-05 06:10:00 【zdaiqing】
wholeTextFiles算子
1.简介
wholeTextFiles简介
SparkContext.wholeTextFilesAbility to process multiple files at the same time;
Takes the folder where the multiple files are located as an argument,构建RDD;
该RDDelements are made for each file (filename, content) 键值对,filename为文件路径,conten为文件内容;
Often used for building multiple small filesRDD的场景;
Returns can be specified by parametersRDDthe minimum number of partitions;
wholeTextFiles与textFile对比
相似
Both can be built from multiple filesRDD;
区别
wholeTextFilesThe argument is the path to the folder where the multiple small files are located,textFile处理多个文件时,The argument is a comma-separated string of multiple file paths;
wholeTextFilesOften used to build from multiple small filesRDD,textFileOften used to build from a single fileRDD;
wholeTextFiles将每个文件作为 (filename, content) 键值对返回,textFile将在每个文件中每行返回一条记录;
2.源码阅读
2.1.wholeTextFiles源码
defaultMinPartitions 逻辑参考spark算子-textFile算子
参数说明
path:The path to the folder where the small file is located
minPartitions:
最小分区数,可以在调用wholeTextFilesoperator specified,也可以不指定;
不指定的情况下,The default minimum number of partitions is defaultMinPartitions:
spark.default.parallelismParameters have valuesp:
当p>2时,defaultMinPartitions=2,即textFile()The operator defaults to a minimum number of partitions2;
当p<=2时,defaultMinPartitions=p,即textFile()The operator defaults to a minimum number of partitionsp;
spark.default.parallelismThe parameter has no value:
集群模式下,defaultMinPartitions=2,即textFile()The operator defaults to a minimum number of partitions2;
local模式下,defaultMinPartitions=min(cpu总核数,2);
代码内容
指定RDD分区
Specifies the read folder path
构建WholeTextFileRDD
RDD转换:文件路径为key,文件内容为value
设置RDD名称
def wholeTextFiles(
path: String,
//This parameter specifies the minimum number of partitions
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
val job = NewHadoopJob.getInstance(hadoopConfiguration)
//Specifies the read folder path
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
//构建WholeTextFileRDD
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
classOf[Text],
classOf[Text],
updateConf,
minPartitions
).map(
//RDD转换:文件路径为key,文件内容为value
record => (record._1.toString, record._2.toString)
).setName(path)
}
2.2.WholeTextFileRDDMedium data partition source code
总结
The number of file fragments is determinedRDD分区数
分区步骤
Set according to the minimum number of partitionsmaxSplitSize(最大分片大小)、minSplitSizeRack(Rack minimum shard size)、minSplitSizeNode(Node minimum shard size)
文件切片
Build partitions from file slices
override def getPartitions: Array[Partition] = {
val conf = getConf
conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
Runtime.getRuntime.availableProcessors().toString)
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
case _ =>
}
val jobContext = new JobContextImpl(conf, jobId)
//设置maxSplitSize(最大分片大小)、minSplitSizeRack(Rack minimum shard size)、minSplitSizeNode(Node minimum shard size)
inputFormat.setMinPartitions(jobContext, minPartitions)
//文件切片
val rawSplits = inputFormat.getSplits(jobContext).toArray
//The number of file slices is set to RDD分区数
val result = new Array[Partition](rawSplits.size)
//Build from file slicesRDD分区
for (i <- 0 until rawSplits.size) {
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
}
result
}
2.3.setMinPartitions 设置切片大小
The maximum number of bytes for a slicemaxSplitSize:
比 总字节数/最小分区数 大的最小整数值
每个节点/Rack's slice minimum bytes cannot be greater thanmaxSplitSize
def setMinPartitions(context: JobContext, minPartitions: Int) {
val files = listStatus(context).asScala
//The total number of bytes for small files
val totalLen = files.map(file => if (file.isDirectory) 0L else file.getLen).sum
//The maximum number of bytes for a slice:比 总字节数/最小分区数 大的最小整数值
val maxSplitSize = Math.ceil(totalLen * 1.0 /
(if (minPartitions == 0) 1 else minPartitions)).toLong
val config = context.getConfiguration
//The minimum number of bytes for node slices set in the configuration file
val minSplitSizePerNode = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERNODE, 0L)
//The minimum number of bytes for rack slices set in the configuration file
val minSplitSizePerRack = config.getLong(CombineFileInputFormat.SPLIT_MINSIZE_PERRACK, 0L)
//确保每个节点/Rack's slice minimum bytes cannot be greater thanmaxSplitSize
if (maxSplitSize < minSplitSizePerNode) {
super.setMinSplitSizeNode(maxSplitSize)
}
if (maxSplitSize < minSplitSizePerRack) {
super.setMinSplitSizeRack(maxSplitSize)
}
super.setMaxSplitSize(maxSplitSize)
}
3.CombineFileInputFormat.getSplits() 文件切片
内容
Get block constraints:minSizeNode、minSizeRack、maxSize
获取文件信息:数量、位置等
Call the specific file slice method
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSizeNode = 0L;
long minSizeRack = 0L;
long maxSize = 0L;
Configuration conf = job.getConfiguration();
//获取minSizeNode、minSizeRack、maxSize;这些值在setMinPartitionsIt was set when setting the slice size,If there is no setting, it is obtained from the configuration file,The configuration file is set without configuration0;
if (this.minSplitSizeNode != 0L) {
minSizeNode = this.minSplitSizeNode;
} else {
minSizeNode = conf.getLong("mapreduce.input.fileinputformat.split.minsize.per.node", 0L);
}
if (this.minSplitSizeRack != 0L) {
minSizeRack = this.minSplitSizeRack;
} else {
minSizeRack = conf.getLong("mapreduce.input.fileinputformat.split.minsize.per.rack", 0L);
}
if (this.maxSplitSize != 0L) {
maxSize = this.maxSplitSize;
} else {
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0L);
}
//确保minSizeNode、minSizeRack不大于maxSize, minSizeNode不大于minSizeRack
if (minSizeNode != 0L && maxSize != 0L && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode + " cannot be larger than maximum split size " + maxSize);
} else if (minSizeRack != 0L && maxSize != 0L && minSizeRack > maxSize) {
throw new IOException("Minimum split size per rack " + minSizeRack + " cannot be larger than maximum split size " + maxSize);
} else if (minSizeRack != 0L && minSizeNode > minSizeRack) {
throw new IOException("Minimum split size per node " + minSizeNode + " cannot be larger than minimum split " + "size per rack " + minSizeRack);
} else {
//Get a list of file attributes
List<FileStatus> stats = this.listStatus(job);
//File slice storage container
List<InputSplit> splits = new ArrayList();
//Empty folders return empty containers directly
if (stats.size() == 0) {
return splits;
} else {
Iterator var11 = this.pools.iterator();
while(var11.hasNext()) {
CombineFileInputFormat.MultiPathFilter onepool = (CombineFileInputFormat.MultiPathFilter)var11.next();
ArrayList<FileStatus> myPaths = new ArrayList();
Iterator iter = stats.iterator();
while(iter.hasNext()) {
FileStatus p = (FileStatus)iter.next();
if (onepool.accept(p.getPath())) {
myPaths.add(p);
iter.remove();
}
}
this.getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
}
//文件切分
this.getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
this.rackToNodes.clear();
return splits;
}
}
}
3.1.getMoreSplits The specific implementation of file slicing
File slicing steps
A single file is summarized in file chunks
Slices according to the number of file chunks and shards
private void getMoreSplits(JobContext job, List<FileStatus> stats, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) throws IOException {
Configuration conf = job.getConfiguration();
//描述blockwith rack、节点之间的对应关系
//机架->blocks
HashMap<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks = new HashMap();
//block -> nodes
HashMap<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes = new HashMap();
//节点node -> blocks
HashMap<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks = new HashMap();
//An array of individual file information
CombineFileInputFormat.OneFileInfo[] files = new CombineFileInputFormat.OneFileInfo[stats.size()];
if (stats.size() != 0) {
long totLength = 0L;
int i = 0;
//Polls each file,And count the total file bytes
for(Iterator var18 = stats.iterator(); var18.hasNext(); totLength += files[i].getLength()) {
FileStatus stat = (FileStatus)var18.next();
//File chunking for a single file
files[i] = new CombineFileInputFormat.OneFileInfo(stat, conf, this.isSplitable(job, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks, this.rackToNodes, maxSize);
}
//Data slicing is performed based on file chunks and the number of shards
this.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, maxSize, minSizeNode, minSizeRack, splits);
}
}
3.1.1.File chunking logic for individual files
The dicing size determines the rules
The dicing size parameter is not set
The file size is the chunk size
The dicing size parameter has been set
文件大小 < Tile size parameter,The file size is the chunk size
Tile size parameter < 文件大小 < Tile size parameter * 2 ,文件大小的1/2 is the slice size
文件大小 > 2 * Tile size parameter,The slice size parameter is the slice size
总结
This code is to build a single file information classOneFileInfo的对象的过程,Divide the file into blocks to buildblock对象,完善OneFileInfo对象的fileSize、blocks属性;
OneFileInfo(FileStatus stat, Configuration conf, boolean isSplitable, HashMap<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks, HashMap<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes, HashMap<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks, HashMap<String, Set<String>> rackToNodes, long maxSize) throws IOException {
//获取文件信息
BlockLocation[] locations;
if (stat instanceof LocatedFileStatus) {
//本地文件
locations = ((LocatedFileStatus)stat).getBlockLocations();
} else {
FileSystem fs = stat.getPath().getFileSystem(conf);
locations = fs.getFileBlockLocations(stat, 0L, stat.getLen());
}
//空文件,Put back an array of empty file blocks
if (locations == null) {
this.blocks = new CombineFileInputFormat.OneBlockInfo[0];
} else {
if (locations.length == 0 && !stat.isDirectory()) {
locations = new BlockLocation[]{
new BlockLocation()};
}
//There is no need for segmentation:A file builds a file blockblock
if (!isSplitable) {
this.blocks = new CombineFileInputFormat.OneBlockInfo[1];
this.fileSize = stat.getLen();
this.blocks[0] = new CombineFileInputFormat.OneBlockInfo(stat.getPath(), 0L, this.fileSize, locations[0].getHosts(), locations[0].getTopologyPaths());
} else {
//存放文件块block的容器
ArrayList<CombineFileInputFormat.OneBlockInfo> blocksList = new ArrayList(locations.length);
int i = 0;
while(true) {
//After the file is chunked,将blocksThe container is converted from a list to an array
if (i >= locations.length) {
this.blocks = (CombineFileInputFormat.OneBlockInfo[])blocksList.toArray(new CombineFileInputFormat.OneBlockInfo[blocksList.size()]);
break;
}
//文件大小
this.fileSize += locations[i].getLength();
//The size of the data to be sliced
long left = locations[i].getLength();
//The starting offset of the data to be sliced
long myOffset = locations[i].getOffset();
//Initialize file block size
long myLength = 0L;
do {
//Determines the file chunk size
//The dicing size parameter is not set,The file size is the file chunk size
if (maxSize == 0L) {
myLength = left;
}
//The file size is set in the slice size parameter1~2倍间,The file chunk size is the size of the file1/2
else if (left > maxSize && left < 2L * maxSize) {
myLength = left / 2L;
}
//The file size is larger than the set tile size parameter2倍的,The set slice size parameter is the file slice size
//The file size is smaller than that set the dicing size parameter,The file size is the file chunk size
else {
myLength = Math.min(maxSize, left);
}
//Divide the data according to the determined file size,Divide the file into chunks
CombineFileInputFormat.OneBlockInfo oneblock = new CombineFileInputFormat.OneBlockInfo(stat.getPath(), myOffset, myLength, locations[i].getHosts(), locations[i].getTopologyPaths());
//Totally removes sliced data from the data to be sliced
left -= myLength;
//Update the actual offset of the data to be sliced
myOffset += myLength;
//将block添加到block容器中
blocksList.add(oneblock);
} while(left > 0L);
++i;
}
}
//并完善block、节点、Mapping relationship between racks
populateBlockInfo(this.blocks, rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
}
}
3.1.1.1OneFileInfo Single file information class
fileSize:文件字节数
blocks:文件块block的数组
static class OneFileInfo {
private long fileSize = 0L;
private CombineFileInputFormat.OneBlockInfo[] blocks;
}
3.1.1.2.OneBlockInfo 单block信息类
static class OneBlockInfo {
Path onepath;//文件位置
long offset;//block起始位置偏移量
long length;//block字节数
String[] hosts;//block所在节点
String[] racks;//blockthe rack
}
3.1.1.3.完善block、node、rack对应关系
static void populateBlockInfo(CombineFileInputFormat.OneBlockInfo[] blocks, Map<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks, Map<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes, Map<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks, Map<String, Set<String>> rackToNodes) {
CombineFileInputFormat.OneBlockInfo[] var5 = blocks;
int var6 = blocks.length;
//轮询blocks数组
for(int var7 = 0; var7 < var6; ++var7) {
CombineFileInputFormat.OneBlockInfo oneblock = var5[var7];
//完善block->节点
blockToNodes.put(oneblock, oneblock.hosts);
String[] racks = null;
if (oneblock.hosts.length == 0) {
racks = new String[]{
"/default-rack"};
} else {
racks = oneblock.racks;
}
int j;
String node;
Object blklist;
//Complete the rack->blocks
for(j = 0; j < racks.length; ++j) {
node = racks[j];
blklist = (List)rackToBlocks.get(node);
if (blklist == null) {
blklist = new ArrayList();
rackToBlocks.put(node, blklist);
}
((List)blklist).add(oneblock);
if (!racks[j].equals("/default-rack")) {
CombineFileInputFormat.addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
}
}
//Complete the node->blocks
for(j = 0; j < oneblock.hosts.length; ++j) {
node = oneblock.hosts[j];
blklist = (Set)nodeToBlocks.get(node);
if (blklist == null) {
blklist = new LinkedHashSet();
nodeToBlocks.put(node, blklist);
}
((Set)blklist).add(oneblock);
}
}
}
3.1.2.Data slicing logic for all files
分片逻辑
1、Poll each node,对每个节点的blockList for data sharding
A1、The current node is pendingblockThe total number of bytes in the collection > maxSize(分片参数)时,进行数据分片
B1、A1不满足时,当 minSizeNode < The current node is pendingblockThe total number of bytes in the collection < maxSize,进行数据分片
C1、A1和B1不满足时,will be pendingblock表示为未处理,等待后续处理
2、进过步骤1处理后,Still unprocessedblock时,Poll each rack,Not processed for each rackblockList for data sharding
A2、The current rack is not processedblock总字节数 > maxSize时,进行数据分片
B2、A2不满足时,当 minSizeRack < The current rack is not processedblock总字节数 < maxSize,进行数据分片
C2、A2和B2不满足时,will not be processedblockPut in a dedicated container,留待后续处理;
3、经过步骤1和步骤2处理后,Still unprocessedblockWhen stored in a dedicated container,in a dedicated containerblockList for data sharding
A3、Polling remainingblock列表,when accumulatingblock字节总数 >= maxSize,进行数据分片
B3、当A3不满足时,to all remainingblockPerform a data fragmentation
void createSplits(Map<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks, Map<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes, Map<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks, long totLength, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) {
//待处理block列表
ArrayList<CombineFileInputFormat.OneBlockInfo> validBlocks = new ArrayList();
//待处理block字节数
long curSplitSize = 0L;
//节点数
int totalNodes = nodeToBlocks.size();
//The total number of bytes required to slice
long totalLength = totLength;
//A collection of data slices for each node
Multiset<String> splitsPerNode = HashMultiset.create();
//已处理节点集合
HashSet completedNodes = new HashSet();
label170:
//Polling for each node data
do {
//node -> blocks 迭代器
Iterator iter = nodeToBlocks.entrySet().iterator();
while(true) {
while(true) {
Entry one;
String node;
do {
//数据处理完毕,Jump out of the big poll
if (!iter.hasNext()) {
continue label170;
}
//The local loop needs to be handlednode->blocks映射
one = (Entry)iter.next();
//Nodes that need to be processed in this loop
node = (String)one.getKey();
} while(completedNodes.contains(node));
//The file block of the current nodeblock集合
Set<CombineFileInputFormat.OneBlockInfo> blocksInCurrentNode = (Set)one.getValue();
//文件块block集合迭代器
Iterator oneBlockIter = blocksInCurrentNode.iterator();
//Polling processes the current node allblock
while(oneBlockIter.hasNext()) {
//单个block
CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)oneBlockIter.next();
if (!blockToNodes.containsKey(oneblock)) {
//Removal processedblock
oneBlockIter.remove();
} else {
//将当前blockAdd to pending list
validBlocks.add(oneblock);
//标识当前block已处理
blockToNodes.remove(oneblock);
//Calculation is pendingblockThe total number of bytes in the list
curSplitSize += oneblock.length;
//The current node is pendingblockThe total number of bytes in the collection > maxSize(分片参数)时,进行数据分片
if (maxSize != 0L && curSplitSize >= maxSize) {
//数据分片
this.addCreatedSplit(splits, Collections.singleton(node), validBlocks);
//Calculate the total number of bytes of data remaining to be fragmented
totalLength -= curSplitSize;
//初始化待处理block字节数
curSplitSize = 0L;
splitsPerNode.add(node);
//Update the set of pending file blocks for the current node
blocksInCurrentNode.removeAll(validBlocks);
//Empty pendingblock集合
validBlocks.clear();
break;
}
}
}
//当前节点blockThe data shard for the collection is not over
if (validBlocks.size() != 0) {
//minSizeNode < The current node is pendingblockThe total number of bytes in the collection < maxSize
if (minSizeNode != 0L && curSplitSize >= minSizeNode && splitsPerNode.count(node) == 0) {
//继续分片
this.addCreatedSplit(splits, Collections.singleton(node), validBlocks);
totalLength -= curSplitSize;
splitsPerNode.add(node);
blocksInCurrentNode.removeAll(validBlocks);
} else {
//minSizeNode > The current node is pendingblockThe total number of bytes in the collection
Iterator var36 = validBlocks.iterator();
//The current node is all pendingblocksIdentified as unprocessed,Subsequent fragmentation processing is performed according to the identification judgment
while(var36.hasNext()) {
CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)var36.next();
blockToNodes.put(oneblock, oneblock.hosts);
}
}
validBlocks.clear();
curSplitSize = 0L;
completedNodes.add(node);
} else if (blocksInCurrentNode.size() == 0) {
//当前节点blockThe data of the collection is all fragmented
completedNodes.add(node);
}
}
}
} while(completedNodes.size() != totalNodes && totalLength != 0L);
LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: " + completedNodes.size() + ", size left: " + totalLength);
ArrayList overflowBlocks = new ArrayList();
HashSet racks = new HashSet();
Iterator iter;
label130:
//After node polling is processed,left unprocessedblock数据的分片
while(blockToNodes.size() > 0) {
iter = rackToBlocks.entrySet().iterator();
//Poll each rack
while(true) {
while(true) {
if (!iter.hasNext()) {
continue label130;
}
Entry<String, List<CombineFileInputFormat.OneBlockInfo>> one = (Entry)iter.next();
racks.add(one.getKey());
//All of the current rackblock列表
List<CombineFileInputFormat.OneBlockInfo> blocks = (List)one.getValue();
boolean createdSplit = false;
Iterator var38 = blocks.iterator();
//Poll rack allblock
while(var38.hasNext()) {
CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)var38.next();
//Filtering the current rack is not processedblock
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
//The current rack is not processedblock总字节数 > maxSize
if (maxSize != 0L && curSplitSize >= maxSize) {
//数据分片
this.addCreatedSplit(splits, this.getHosts(racks), validBlocks);
//Shard success flag
createdSplit = true;
break;
}
}
}
if (createdSplit) {
curSplitSize = 0L;
validBlocks.clear();
racks.clear();
} else {
//The current rack is not processedblock总字节数 < maxSize
if (!validBlocks.isEmpty()) {
//minSizeRack < The current rack is not processedblock总字节数 < maxSize
if (minSizeRack != 0L && curSplitSize >= minSizeRack) {
//数据分片
this.addCreatedSplit(splits, this.getHosts(racks), validBlocks);
} else {
//minSizeRack > The current rack is not processedblock总字节数
//The current rack is not processedblockFragmentation conditions are not met,后续处理
overflowBlocks.addAll(validBlocks);
}
}
curSplitSize = 0L;
validBlocks.clear();
racks.clear();
}
}
}
}
assert blockToNodes.isEmpty();
assert curSplitSize == 0L;
assert validBlocks.isEmpty();
assert racks.isEmpty();
//节点、After rack polling for shards,Left unshardedblocks列表迭代器
iter = overflowBlocks.iterator();
//Polling remainingblock
while(iter.hasNext()) {
CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)iter.next();
validBlocks.add(oneblock);
curSplitSize += oneblock.length;
for(int i = 0; i < oneblock.racks.length; ++i) {
racks.add(oneblock.racks[i]);
}
//剩余block字节总数 >= maxSize
if (maxSize != 0L && curSplitSize >= maxSize) {
//数据分片
this.addCreatedSplit(splits, this.getHosts(racks), validBlocks);
curSplitSize = 0L;
validBlocks.clear();
racks.clear();
}
}
//最后,将剩余block进行数据分片;这部分blockThe total number of list bytes < maxSize
if (!validBlocks.isEmpty()) {
this.addCreatedSplit(splits, this.getHosts(racks), validBlocks);
}
}
3.1.2.1.addCreatedSplit Build data shards
private void addCreatedSplit(List<InputSplit> splitList, Collection<String> locations, ArrayList<CombineFileInputFormat.OneBlockInfo> validBlocks) {
Path[] fl = new Path[validBlocks.size()];
long[] offset = new long[validBlocks.size()];
long[] length = new long[validBlocks.size()];
//Polling pendingblock
for(int i = 0; i < validBlocks.size(); ++i) {
fl[i] = ((CombineFileInputFormat.OneBlockInfo)validBlocks.get(i)).onepath;
offset[i] = ((CombineFileInputFormat.OneBlockInfo)validBlocks.get(i)).offset;
length[i] = ((CombineFileInputFormat.OneBlockInfo)validBlocks.get(i)).length;
}
//Build a data slice object
CombineFileSplit thissplit = new CombineFileSplit(fl, offset, length, (String[])locations.toArray(new String[0]));
//List of data slices
splitList.add(thissplit);
}
3.1.2.2.Slice information class
A data slice contains 0~n个文件块
public class CombineFileSplit extends InputSplit implements Writable {
private Path[] paths; //数据库路径
private long[] startoffset; //Data block start offset
private long[] lengths; //数据块字节数
private String[] locations; //The node where the data block is located
private long totLength; //The number of bytes in the data slice
private void initSplit(Path[] files, long[] start, long[] lengths, String[] locations) {
this.startoffset = start;
this.lengths = lengths;
this.paths = files;
this.totLength = 0L;
this.locations = locations;
long[] var5 = lengths;
int var6 = lengths.length;
for(int var7 = 0; var7 < var6; ++var7) {
long length = var5[var7];
this.totLength += length;
}
}
}
4.参考资料
边栏推荐
- Remembering my first CCF-A conference paper | After six rejections, my paper is finally accepted, yay!
- 每日一题-DFS
- Getting Started Doc 06 Adding files to a stream
- 【Day8】(超详细步骤)使用LVM扩容
- 入门文档04 一个任务依赖另外一个任务时,需要按顺序执行
- Getting Started Document 01 series in order
- 什么?CDN缓存加速只适用于加速静态内容?
- 入门文档08 条件插件
- 栈区中越界可能造成的死循环可能
- 【UiPath2022+C#】UiPath 数据操作
猜你喜欢
随机推荐
Spark源码-任务提交流程之-6-sparkContext初始化
网站ICP备案是什么呢?
TensorFlow ObjecDetectionAPI在win10系统Anaconda3下的配置
【Day1】(超详细步骤)构建软RAID磁盘阵列
【Machine Learning】1 Univariate Linear Regression
Lua,ILRuntime, HybridCLR(wolong)/huatuo热更对比分析
通过单总线调用ds18b20的问题
【UiPath2022+C#】UiPath 练习和解决方案-变量、数据类型和控制流程
Blender软件介绍与使用心得
【Day8】磁盘及磁盘的分区有关知识
VRRP原理及命令
硬盘分区和永久挂载
[Day5] Soft and hard links File storage, deletion, directory management commands
spark源码-RPC通信机制
来来来,一文让你读懂Cocos Creator如何读写JSON文件
【Day8】 RAID磁盘阵列
C语言入门笔记 —— 分支与循环
阿里云视频点播
lvm逻辑卷及磁盘配额
每日一题-合并两个有序链表-0720