当前位置:网站首页>Spark lazy list files 的实现
Spark lazy list files 的实现
2022-07-27 14:23:00 【wankunde】
背景
对于 Spark partition table, 在生成 HadoopFsRelation 时,如果 partitionKeyFilters 或者 subqueryFilters 非空的时候,HadoopFsRelation 的 location: FileIndex 属性为 LazyFileIndex, 在最终 FileSourceScanExec 调用 listFiles 之前才将 LazyFileIndex 转换成 InMemoryFileIndex。
但是如果 Spark partition table 的 partition filter 含有 subquery, 此时 Spark 认为无法下推,所以会跳过使用 LazyFileIndex, 在 listFiles 时 prunePartitions 也不会过滤掉任务分区,导致做了很多没用的操作。
如果分区过滤条件是 subquery, 默认会把所有的分区全部拿回来,然后再进行分区过滤。
生成 HadoopFsRelation
在 DataSourceStrategy 的 FindDataSourceTable Rule 会尝试解析 'UnresolvedCatalogRelation 中的 CatalogTable。在 DataSource.resolveRelation() 方法尝试将 table Node 转换成 HadoopFsRelation。 如果 DataSource 传入的 table 是分区表,fileCatalog 使用 CatalogFileIndex 以便于后面分区裁剪。否则,直接遍历访问DataSource 中传入的Paths, 生成 InMemoryFileIndex
// DataSource.resolveRelation()
case (format: FileFormat, _) =>
val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions &&
catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog &&
catalogTable.get.partitionColumnNames.nonEmpty
val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) {
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
val index = new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
(index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema)
} else {
val globbedPaths = checkAndGlobPathIfNecessary(
checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
getOrInferFileFormatSchema(format, () => index)
(index, resultDataSchema, resultPartitionSchema)
}
HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession)
对 HadoopFsRelation 进行分区裁剪
- 社区版本 code 在 PruneFileSourcePartitions rule 中对 HadoopFsRelation 进行分区裁剪。
- 因为分区表生成的是 CatalogFileIndex, 通过 Plan 中的带有分区字段的过滤条件, 用来进行分区裁剪
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters) - 如果过滤条件中含有 subquery, 这种过滤条件不能下推到 Hive metastore,导致返回非常多 partitions(对应多个rootPaths)。
- 将裁减后的分区条件包装成 InMemoryFileIndex 。InMemoryFileIndex 实例化的时候执行 refresh0() 方法,获取 rootPaths 下所有的文件信息。因为 files 过多会导致 plan 解析变慢,同时占用大量 Driver 内存。
// InMemoryFileIndex
private def refresh0(): Unit = {
val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
cachedPartitionSpec = null
}
LazyFileIndex 优化
- Carmel 版本进行了优化,
CatalogFileIndex.lazyFilterPartitions(filters: Seq[Expression])返回的是 PartitioningAwareFileIndex 子类 LazyFileIndex ,这个类不会发生实际的 HDFS 遍历。 - 在 FileSourceStrategy 中会将 HadoopFsRelation 转换为 FileSourceScanExec 。在 FileSourceScanExec 中有三个 lazy 变量: selectedPartitions, dynamicallySelectedPartitions, inputRDD
- selectedPartitions : 通过 InMemoryFileIndex.listFiles() 返回选中的分区
- dynamicallySelectedPartitions : 使用不能下推的分区过滤条件对 selectedPartitions 再过滤一次
- inputRDD : 处理最终确定的分区,生成 FileScanRDD
- 我们在访问 selectedPartitions 的时候,自动将 LazyFileIndex 替换成 InMemoryFileIndex 并进行 HDFS 遍历。
FileIndex 类继承关系
FileIndex
CatalogFileIndex
def lazyFilterPartitions(filters: Seq[Expression]): PartitioningAwareFileIndex
PartitioningAwareFileIndex
LazyFileIndex
def createFileIndex(predicates: Seq[Expression]): InMemoryFileIndex
InMemoryFileIndex
def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
MetadataLogFileIndex // 非重点,先忽略
一些讨论
- 优化点是对文件目录的遍历放到了 plan analyze 之后, 需关注文件在 hdfs 上的遍历耗时,还有遍历的文件结果内存占用。
- TODO …
边栏推荐
- 修改frameworks资源文件如何单编
- 《剑指Offer》剪绳子
- Is it safe to open an account on a mobile phone?
- Sword finger offer cut rope
- Network equipment hard core technology insider router Chapter 21 reconfigurable router
- Unity性能优化------DrawCall
- The reverse order pairs in the "sword finger offer" array
- Kubernetes CNI classification / operation mechanism
- EMC design scheme of CAN bus
- Watermelon book machine learning reading notes Chapter 1 Introduction
猜你喜欢

After configuring corswebfilter in grain mall, an error is reported: resource sharing error:multiplealloworiginvalues
Principle of MOS tube to prevent reverse connection of power supply

Leetcode 781. rabbit hash table in forest / mathematical problem medium

Kotlin的基础用法

LeetCode 74. 搜索二维矩阵 二分/medium

Unity performance optimization ----- occlusion culling of rendering optimization (GPU)

With just two modifications, apple gave styleganv2 3D generation capabilities

3.3-5v conversion

reflex

积分运算电路的设计方法详细介绍
随机推荐
Network equipment hard core technology insider router Chapter 9 Cisco asr9900 disassembly (II)
Selenium 报错:session not created: This version of ChromeDriver only supports Chrome version 81
Network equipment hard core technology insider router Chapter 10 Cisco asr9900 disassembly (III)
Internship: compilation of other configuration classes
华为鸿蒙模拟器去除顶部导航栏方法
《剑指Offer》数组中的逆序对
JUC(JMM、Volatile)
《剑指Offer》两个链表的第一个公共结点
泛型
What is the breakthrough point of digital transformation in the electronic manufacturing industry? Lean manufacturing is the key
微信公众平台开发概述
Unity 鼠标控制第一人称摄像机视角
The first common node of the two linked lists of "Jianzhi offer"
Four kinds of relay schemes driven by single chip microcomputer
Comparison of advantages and disadvantages between instrument amplifier and operational amplifier
网络设备硬核技术内幕 路由器篇 6 汤普金森漫游网络世界(中)
Data warehouse project is never a technical project
USB interface electromagnetic compatibility (EMC) solution
Basic usage of kotlin
3.3-5v转换