当前位置:网站首页>Implementation of spark lazy list files
Implementation of spark lazy list files
2022-07-27 15:38:00 【wankunde】
background
about Spark partition table, It's generating HadoopFsRelation when , If partitionKeyFilters perhaps subqueryFilters When it's not empty ,HadoopFsRelation Of location: FileIndex The attribute is LazyFileIndex, In the end FileSourceScanExec call listFiles Only before LazyFileIndex convert to InMemoryFileIndex.
But if Spark partition table Of partition filter contain subquery, here Spark I think I can't push down , So I will skip using LazyFileIndex, stay listFiles when prunePartitions It will not filter out the task partition , It leads to a lot of useless operations .
If the partition filter condition is subquery, By default, all partitions will be taken back , Then partition filtering .
Generate HadoopFsRelation
stay DataSourceStrategy Of FindDataSourceTable Rule Will try to parse 'UnresolvedCatalogRelation Medium CatalogTable. stay DataSource.resolveRelation() Method attempts to table Node convert to HadoopFsRelation. If DataSource Incoming table It's a partition table ,fileCatalog Use CatalogFileIndex So as to facilitate the tailoring of later partitions . otherwise , Direct traversal access DataSource In the middle of Paths, Generate InMemoryFileIndex
// DataSource.resolveRelation()
case (format: FileFormat, _) =>
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
catalogTable.get.partitionColumnNames.nonEmpty
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
val index = new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
(index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
} else {
val globbedPaths = checkAndGlobPathIfNecessary(
checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, () => index)
(index, resultDataSchema, resultPartitionSchema)
}
HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession)
Yes HadoopFsRelation Cut the partition
- Community version code stay PruneFileSourcePartitions rule Chinese vs HadoopFsRelation Cut the partition .
- Because the partition table generates CatalogFileIndex, adopt Plan Filter conditions with partition fields in , Used for partition clipping
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters) - If the filter conditions contain subquery, This filtering condition cannot be pushed down to Hive metastore, Resulting in very many returns partitions( Corresponding multiple rootPaths).
- Pack the reduced partition conditions into InMemoryFileIndex .InMemoryFileIndex Execute when instantiating refresh0() Method , obtain rootPaths Download all the file information . because files Too much can lead to plan Parsing slows down , At the same time, it takes up a lot of Driver Memory .
// InMemoryFileIndex
private def refresh0(): Unit = {
val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
cachedPartitionSpec = null
}
LazyFileIndex Optimize
- Carmel Version is optimized ,
CatalogFileIndex.lazyFilterPartitions(filters: Seq[Expression])The return is PartitioningAwareFileIndex Subclass LazyFileIndex , This class does not actually HDFS Traverse . - stay FileSourceStrategy Will be HadoopFsRelation Convert to FileSourceScanExec . stay FileSourceScanExec Three of them lazy Variable : selectedPartitions, dynamicallySelectedPartitions, inputRDD
- selectedPartitions : adopt InMemoryFileIndex.listFiles() Return to the selected partition
- dynamicallySelectedPartitions : Use partition filter conditions that cannot be pushed down selectedPartitions Filter again
- inputRDD : Deal with the final partition , Generate FileScanRDD
- We are visiting selectedPartitions When , Automatically put LazyFileIndex Replace with InMemoryFileIndex And carry on HDFS Traverse .
FileIndex Class inheritance relation
FileIndex
CatalogFileIndex
def lazyFilterPartitions(filters: Seq[Expression]): PartitioningAwareFileIndex
PartitioningAwareFileIndex
LazyFileIndex
def createFileIndex(predicates: Seq[Expression]): InMemoryFileIndex
InMemoryFileIndex
def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
MetadataLogFileIndex // Non emphasis , Ignore first
Some discussions
- The optimization point is that the traversal of file directories is put into plan analyze after , Documents needing attention are listed in hdfs Traversal on takes time , There is also the memory occupation of traversal file results .
- TODO …
边栏推荐
- Spark 任务Task调度异常分析
- Leetcode 190. reverse binary bit operation /easy
- 折半插入排序
- Spark Bucket Table Join
- 低代码是开发的未来吗?浅谈低代码平台
- How to edit a framework resource file separately
- Spark3中Catalog组件设计和自定义扩展Catalog实现
- Google team launches new transformer to optimize panoramic segmentation scheme CVPR 2022
- Spark 3.0 adaptive execution code implementation and data skew optimization
- C语言中交换两数的方法
猜你喜欢

Adaptation verification new occupation is coming! Huayun data participated in the preparation of the national vocational skill standard for information system adaptation verifiers

C语言:动态内存函数

flutter —— 布局原理与约束

直接插入排序

npm install错误 unable to access

Spark TroubleShooting整理

Spark troubleshooting finishing

学习Parquet文件格式

Leetcode 783. binary search tree node minimum distance tree /easy

折半插入排序
随机推荐
JUC(JMM、Volatile)
C语言:扫雷小游戏
Record record record
Spark RPC
npm install错误 unable to access
Network equipment hard core technology insider router Chapter 10 Cisco asr9900 disassembly (III)
【剑指offer】面试题56-Ⅰ:数组中数字出现的次数Ⅰ
Leetcode 191. number of 1 bits bit operation /easy
Selenium reports an error: session not created: this version of chromedriver only supports chrome version 81
Network equipment hard core technology insider router Chapter 3 Jia Baoyu sleepwalking in Taixu Fantasy (middle)
【剑指offer】面试题46:把数字翻译成字符串——动态规划
Unity3d learning note 10 - texture array
Leetcode 456.132 mode monotone stack /medium
Network equipment hard core technology insider router Chapter 5 tompkinson roaming the network world (Part 1)
HJ8 合并表记录
[TensorBoard] OSError: [Errno 22] Invalid argument处理
QT (IV) mixed development using code and UI files
【剑指offer】面试题54:二叉搜索树的第k大节点
Leetcode 341. flattened nested list iterator DFS, stack / medium
实体类(VO,DO,DTO)的划分