当前位置:网站首页>Spark Filter算子在Parquet文件上的下推
Spark Filter算子在Parquet文件上的下推
2022-07-27 14:23:00 【wankunde】
Spark程序在读取Parquet文件的时候可以将合适的Filter条件转换为Parquet文件的Filter,根据文件Footer中的统计信息提交过滤掉一部分不满足条件的Block,减少数据的IO。
准备测试环境
- Spark Version : 3.0.0
- Parquet Version : 1.10.1
import spark.implicits._
import scala.util.Random
import scala.math.BigDecimal
import java.time.LocalDateTime
import java.sql.Timestamp
spark.range(1, 1000).map {
id =>
val id2 = id + Random.nextInt(10) - 5
val id3 = BigDecimal((id * 100 + Random.nextInt(100)) / 100.0)
val name = s"wankun_$id"
val isMan = id % 2 == 0
val birthday = Timestamp.valueOf(LocalDateTime.now.plusDays(id))
(id.toInt, id2, id3, name, isMan, birthday)
}.toDF("id", "id2", "id3", "name", "isMan", "birthday")
.createOrReplaceTempView("tab2")
spark.sql(
s"""create table tab (
| id1 int,
| id2 bigint,
| id3 decimal,
| name string,
| isMan boolean,
| birthday timestamp
|)
|stored as parquet;
|""".stripMargin)
spark.sql("insert overwrite table tab select * from tab2")
spark.sql("select * from tab where id1 =4 limit 10").show()
- 在开始测试之前,为了方便我们观察filter在parquet上的下推情况,我们先把parquet的日志级别调整成INFO:
log4j.logger.org.apache.parquet=INFO - 测试Parquet文件Footer信息:
ParquetMetaData{FileMetaData{schema: message spark_schema {
required int32 id1;
required int64 id2;
optional fixed_len_byte_array(5) id3 (DECIMAL(10,0));
optional binary name (UTF8);
required boolean isMan;
optional int96 birthday;
}
, metadata: {org.apache.spark.version=3.0.0, org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id1","type":"integer","nullable":false,"metadata":{}},{"name":"id2","type":"long","nullable":false,"metadata":{}},{"name":"id3","type":"decimal(10,0)","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"isMan","type":"boolean","nullable":false,"metadata":{}},{"name":"birthday","type":"timestamp","nullable":true,"metadata":{}}]}}}, blocks: [BlockMetaData{999, 43182 [ColumnMetaData{SNAPPY [id1] required int32 id1 [BIT_PACKED, PLAIN], 4}, ColumnMetaData{SNAPPY [id2] required int64 id2 [BIT_PACKED, PLAIN_DICTIONARY], 4053}, ColumnMetaData{SNAPPY [id3] optional fixed_len_byte_array(5) id3 (DECIMAL(10,0)) [BIT_PACKED, PLAIN, RLE], 8096}, ColumnMetaData{SNAPPY [name] optional binary name (UTF8) [BIT_PACKED, PLAIN, RLE], 11254}, ColumnMetaData{SNAPPY [isMan] required boolean isMan [BIT_PACKED, PLAIN], 16305}, ColumnMetaData{SNAPPY [birthday] optional int96 birthday [BIT_PACKED, RLE, PLAIN_DICTIONARY], 16351}]}]}
观察Filter在文件上的下推结果:
scala> spark.sql("select * from tab where id1 =4 limit 10").show()
2021-03-11 17:30:12,606 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
2021-03-11 17:30:12,809 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
2021-03-11 17:30:12,858 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where id2 =4 limit 10").show()
2021-03-11 17:30:29,266 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
2021-03-11 17:30:29,281 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
spark.authenticate=false
2021-03-11 17:30:29,292 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
+---+---+---+----+-----+--------+
|id1|id2|id3|name|isMan|birthday|
+---+---+---+----+-----+--------+
+---+---+---+----+-----+--------+
scala> spark.sql("select * from tab where id3 =4.0 limit 10").show()
2021-03-11 17:30:44,114 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
2021-03-11 17:30:44,130 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
2021-03-11 17:30:44,143 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 3| -1| 4|wankun_3|false|2021-03-14 15:03:...|
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where name ='wankun_6' limit 10").show()
2021-03-11 17:31:21,261 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
2021-03-11 17:31:21,277 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
2021-03-11 17:31:21,287 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 6| 7| 6|wankun_6| true|2021-03-17 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where isMan =true limit 2").show()
2021-03-11 17:31:43,460 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
2021-03-11 17:31:43,474 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
2021-03-11 17:31:43,484 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 2| -3| 3|wankun_2| true|2021-03-13 15:03:...|
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where birthday > current_timestamp() limit 2").show()
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 1| -4| 1|wankun_1|false|2021-03-12 15:03:...|
| 2| -3| 3|wankun_2| true|2021-03-13 15:03:...|
+---+---+---+--------+-----+--------------------+
源码走读说明
从SparkPlan的doExecute()方法入口到调用ParquetFileFormat进行Parquet文件的读取的堆栈如下:
SparkPlan.doExecute() // 抽象方法
DataSourceScanExec.doExecute() // 实际对象调用 {
// 内部访问inputRDD, inputRDD Lazy实例化
lazy val inputRDD: RDD[InternalRow] = {
// readFile 函数根据Plan解析得到的 relation.fileFormat 来创建reader
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters, // 这里的pushedDownFilters 是Spark优化器优化后剩余的filter
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
val readRDD =
createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation)
readRDD
}
}
ParquetFileFormat 是FileFormat的子类,读取parquet文件的时候,直接看 ParquetFileFormat的 buildReaderWithPartitionValues 方法实现
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
// 下面很长一大段都是在设置读取Parquet相关参数,无关主程序逻辑,不过都是有用的,有需要的时候可以看一下
// ParquetReadSupport 做一些spark和parquet的shcema转化已经数据类型转换的工作
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
......
// spark.sql.parquet.int96TimestampConversion 这个参数只是用于做数据类型的兼容性转换,无法做到Filter下推的类型转换
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
// 这个就是方法的返回对象了,根据创建的file,返回reader的迭代器
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
// 所有Parquet的文件读取,都封装成一个ParquetInputSplit进行操作
val filePath = new Path(new URI(file.filePath))
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
file.start,
file.start + file.length,
file.length,
Array.empty,
null)
val sharedConf = broadcastedHadoopConf.value.value
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
// 这里尝试将SparkPlan的Filter转换为Parquet的Filter,
// 1. parquetFilters本身可以控制是否对date,timestamp等类型进行转换
// 2. 像timestamp在当前版本的parquet中存储格式为INT96, 目前还不支持转换
// 3. 最后剩余的filter就是下推到parquet文件上的filter
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
} else {
None
}
......
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records. // 注意了,这里是RowGroups的下推,不是行级别的下推~~
if (pushed.isDefined) {
// 如果存在可以文件下推的filter,直接设置到hadoop config参数中:
// FILTER_PREDICATE = "parquet.private.read.filter.predicate"
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
// 这里创建reader,并将reader封装为一个常规的iterator,没什么特殊的
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz, enableVectorizedReader = false, datetimeRebaseMode)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
}
阅读说明:
- Notice: This push-down is RowGroups level, not individual records. // 注意了,这里是RowGroups的下推,不是行级别的下推~~
- 程序会将Filter下推信息放到Hadoop Configuration对象的
parquet.private.read.filter.predicate属性中,不过该属性值是一个经过gzip压缩的byte数组。我们在查看的时候,可以看另外一个属性:configuration.get("parquet.private.read.filter.predicate.human.readable") = "and(noteq(id1, null), eq(id1, 4))".参考代码: org.apache.parquet.hadoop.ParquetInputFormat 的setFilterPredicate()和getFilterPredicate()函数 - 以SQL中过滤条件
id1 = 4为例,最终生成的Parquet文件的Filter如下:
filter = {[email protected]}
filterPredicate = {[email protected]} "and(noteq(id1, null), eq(id1, 4))"
left = {[email protected]} "noteq(id1, null)"
right = {[email protected]} "eq(id1, 4)"
toString = "and(noteq(id1, null), eq(id1, 4))"
- int, long, string, boolean 类型支持, decimal , timestamp 类型不支持(timestamp被当前版本的parquet存储为INT96类型,当前代码库中,暂时不支持该数值类型的下推)

边栏推荐
- How to edit a framework resource file separately
- MySQL 面试40连问,面试官你再问下去我可要翻脸了
- 3.3-5v转换
- Network equipment hard core technology insider router 19 dpdk (IV)
- Leetcode 783. binary search tree node minimum distance tree /easy
- LeetCode 74. 搜索二维矩阵 二分/medium
- 积分运算电路的设计方法详细介绍
- Network equipment hard core technology insider router chapter Cisco asr9900 disassembly (I)
- Comparison of advantages and disadvantages between instrument amplifier and operational amplifier
- Network equipment hard core technology insider router Chapter 7 tompkinson roaming the network world (Part 2)
猜你喜欢

Leetcode-1737-满足三条件之一需改变的最少字符数
USB interface electromagnetic compatibility (EMC) solution

LeetCode 74. 搜索二维矩阵 二分/medium

reflex

适配验证新职业来了!华云数据参与国家《信息系统适配验证师国家职业技能标准》编制

The design method of integral operation circuit is introduced in detail
Principle of MOS tube to prevent reverse connection of power supply

LeetCode 783. 二叉搜索树节点最小距离 树/easy

Watermelon book machine learning reading notes Chapter 1 Introduction

Selenium reports an error: session not created: this version of chromedriver only supports chrome version 81
随机推荐
Leetcode-1737-满足三条件之一需改变的最少字符数
"Sword finger offer" linked list inversion
STM32F10x_ Hardware I2C read / write EEPROM (standard peripheral library version)
LeetCode 456. 132模式 单调栈/medium
shell脚本读取文本中的redis命令批量插入redis
Network equipment hard core technology insider router chapter Cisco asr9900 disassembly (I)
Network device hard core technology insider router Chapter 15 from deer by device to router (Part 2)
npm install错误 unable to access
Design scheme of digital oscilloscope based on stm32
Selenium 报错:session not created: This version of ChromeDriver only supports Chrome version 81
Four kinds of relay schemes driven by single chip microcomputer
Transactions_ Basic demonstrations and transactions_ Default auto submit & manual submit
Pytorch replaces some components in numpy. / / please indicate the source of the reprint
cap理论和base理论
JMeter recording interface automation
Leetcode 90. subset II backtracking /medium
Leetcode 456.132 mode monotone stack /medium
3D相关的简单数学知识
Inside router of network equipment hard core technology (10) disassembly of Cisco asr9900 (4)
USB2.0接口的EMC设计方案