当前位置:网站首页>Spark Filter算子在Parquet文件上的下推
Spark Filter算子在Parquet文件上的下推
2022-07-27 14:23:00 【wankunde】
Spark程序在读取Parquet文件的时候可以将合适的Filter条件转换为Parquet文件的Filter,根据文件Footer中的统计信息提交过滤掉一部分不满足条件的Block,减少数据的IO。
准备测试环境
- Spark Version : 3.0.0
- Parquet Version : 1.10.1
import spark.implicits._
import scala.util.Random
import scala.math.BigDecimal
import java.time.LocalDateTime
import java.sql.Timestamp
spark.range(1, 1000).map {
id =>
val id2 = id + Random.nextInt(10) - 5
val id3 = BigDecimal((id * 100 + Random.nextInt(100)) / 100.0)
val name = s"wankun_$id"
val isMan = id % 2 == 0
val birthday = Timestamp.valueOf(LocalDateTime.now.plusDays(id))
(id.toInt, id2, id3, name, isMan, birthday)
}.toDF("id", "id2", "id3", "name", "isMan", "birthday")
.createOrReplaceTempView("tab2")
spark.sql(
s"""create table tab (
| id1 int,
| id2 bigint,
| id3 decimal,
| name string,
| isMan boolean,
| birthday timestamp
|)
|stored as parquet;
|""".stripMargin)
spark.sql("insert overwrite table tab select * from tab2")
spark.sql("select * from tab where id1 =4 limit 10").show()
- 在开始测试之前,为了方便我们观察filter在parquet上的下推情况,我们先把parquet的日志级别调整成INFO:
log4j.logger.org.apache.parquet=INFO - 测试Parquet文件Footer信息:
ParquetMetaData{FileMetaData{schema: message spark_schema {
required int32 id1;
required int64 id2;
optional fixed_len_byte_array(5) id3 (DECIMAL(10,0));
optional binary name (UTF8);
required boolean isMan;
optional int96 birthday;
}
, metadata: {org.apache.spark.version=3.0.0, org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id1","type":"integer","nullable":false,"metadata":{}},{"name":"id2","type":"long","nullable":false,"metadata":{}},{"name":"id3","type":"decimal(10,0)","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"isMan","type":"boolean","nullable":false,"metadata":{}},{"name":"birthday","type":"timestamp","nullable":true,"metadata":{}}]}}}, blocks: [BlockMetaData{999, 43182 [ColumnMetaData{SNAPPY [id1] required int32 id1 [BIT_PACKED, PLAIN], 4}, ColumnMetaData{SNAPPY [id2] required int64 id2 [BIT_PACKED, PLAIN_DICTIONARY], 4053}, ColumnMetaData{SNAPPY [id3] optional fixed_len_byte_array(5) id3 (DECIMAL(10,0)) [BIT_PACKED, PLAIN, RLE], 8096}, ColumnMetaData{SNAPPY [name] optional binary name (UTF8) [BIT_PACKED, PLAIN, RLE], 11254}, ColumnMetaData{SNAPPY [isMan] required boolean isMan [BIT_PACKED, PLAIN], 16305}, ColumnMetaData{SNAPPY [birthday] optional int96 birthday [BIT_PACKED, RLE, PLAIN_DICTIONARY], 16351}]}]}
观察Filter在文件上的下推结果:
scala> spark.sql("select * from tab where id1 =4 limit 10").show()
2021-03-11 17:30:12,606 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
2021-03-11 17:30:12,809 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
2021-03-11 17:30:12,858 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where id2 =4 limit 10").show()
2021-03-11 17:30:29,266 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
2021-03-11 17:30:29,281 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
spark.authenticate=false
2021-03-11 17:30:29,292 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
+---+---+---+----+-----+--------+
|id1|id2|id3|name|isMan|birthday|
+---+---+---+----+-----+--------+
+---+---+---+----+-----+--------+
scala> spark.sql("select * from tab where id3 =4.0 limit 10").show()
2021-03-11 17:30:44,114 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
2021-03-11 17:30:44,130 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
2021-03-11 17:30:44,143 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 3| -1| 4|wankun_3|false|2021-03-14 15:03:...|
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where name ='wankun_6' limit 10").show()
2021-03-11 17:31:21,261 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
2021-03-11 17:31:21,277 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
2021-03-11 17:31:21,287 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 6| 7| 6|wankun_6| true|2021-03-17 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where isMan =true limit 2").show()
2021-03-11 17:31:43,460 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
2021-03-11 17:31:43,474 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
2021-03-11 17:31:43,484 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 2| -3| 3|wankun_2| true|2021-03-13 15:03:...|
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where birthday > current_timestamp() limit 2").show()
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 1| -4| 1|wankun_1|false|2021-03-12 15:03:...|
| 2| -3| 3|wankun_2| true|2021-03-13 15:03:...|
+---+---+---+--------+-----+--------------------+
源码走读说明
从SparkPlan的doExecute()方法入口到调用ParquetFileFormat进行Parquet文件的读取的堆栈如下:
SparkPlan.doExecute() // 抽象方法
DataSourceScanExec.doExecute() // 实际对象调用 {
// 内部访问inputRDD, inputRDD Lazy实例化
lazy val inputRDD: RDD[InternalRow] = {
// readFile 函数根据Plan解析得到的 relation.fileFormat 来创建reader
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters, // 这里的pushedDownFilters 是Spark优化器优化后剩余的filter
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
val readRDD =
createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation)
readRDD
}
}
ParquetFileFormat 是FileFormat的子类,读取parquet文件的时候,直接看 ParquetFileFormat的 buildReaderWithPartitionValues 方法实现
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
// 下面很长一大段都是在设置读取Parquet相关参数,无关主程序逻辑,不过都是有用的,有需要的时候可以看一下
// ParquetReadSupport 做一些spark和parquet的shcema转化已经数据类型转换的工作
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
......
// spark.sql.parquet.int96TimestampConversion 这个参数只是用于做数据类型的兼容性转换,无法做到Filter下推的类型转换
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
// 这个就是方法的返回对象了,根据创建的file,返回reader的迭代器
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
// 所有Parquet的文件读取,都封装成一个ParquetInputSplit进行操作
val filePath = new Path(new URI(file.filePath))
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
file.start,
file.start + file.length,
file.length,
Array.empty,
null)
val sharedConf = broadcastedHadoopConf.value.value
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
// 这里尝试将SparkPlan的Filter转换为Parquet的Filter,
// 1. parquetFilters本身可以控制是否对date,timestamp等类型进行转换
// 2. 像timestamp在当前版本的parquet中存储格式为INT96, 目前还不支持转换
// 3. 最后剩余的filter就是下推到parquet文件上的filter
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
} else {
None
}
......
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records. // 注意了,这里是RowGroups的下推,不是行级别的下推~~
if (pushed.isDefined) {
// 如果存在可以文件下推的filter,直接设置到hadoop config参数中:
// FILTER_PREDICATE = "parquet.private.read.filter.predicate"
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
// 这里创建reader,并将reader封装为一个常规的iterator,没什么特殊的
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz, enableVectorizedReader = false, datetimeRebaseMode)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
}
阅读说明:
- Notice: This push-down is RowGroups level, not individual records. // 注意了,这里是RowGroups的下推,不是行级别的下推~~
- 程序会将Filter下推信息放到Hadoop Configuration对象的
parquet.private.read.filter.predicate属性中,不过该属性值是一个经过gzip压缩的byte数组。我们在查看的时候,可以看另外一个属性:configuration.get("parquet.private.read.filter.predicate.human.readable") = "and(noteq(id1, null), eq(id1, 4))".参考代码: org.apache.parquet.hadoop.ParquetInputFormat 的setFilterPredicate()和getFilterPredicate()函数 - 以SQL中过滤条件
id1 = 4为例,最终生成的Parquet文件的Filter如下:
filter = {[email protected]}
filterPredicate = {[email protected]} "and(noteq(id1, null), eq(id1, 4))"
left = {[email protected]} "noteq(id1, null)"
right = {[email protected]} "eq(id1, 4)"
toString = "and(noteq(id1, null), eq(id1, 4))"
- int, long, string, boolean 类型支持, decimal , timestamp 类型不支持(timestamp被当前版本的parquet存储为INT96类型,当前代码库中,暂时不支持该数值类型的下推)

边栏推荐
- Watermelon book machine learning reading notes Chapter 1 Introduction
- Cap theory and base theory
- Leetcode 244 week competition - post competition supplementary question solution [broccoli players]
- 魔塔项目中的问题解决
- Leetcode 783. binary search tree node minimum distance tree /easy
- MySQL interview 40 consecutive questions, interviewer, if you continue to ask, I will turn my face
- Network equipment hard core technology insider router 20 dpdk (V)
- Leetcode 190. reverse binary bit operation /easy
- LeetCode 74. 搜索二维矩阵 二分/medium
- Method of removing top navigation bar in Huawei Hongmeng simulator
猜你喜欢

After configuring corswebfilter in grain mall, an error is reported: resource sharing error:multiplealloworiginvalues

How to take satisfactory photos / videos from hololens

ad7606与stm32连接电路介绍

Method of removing top navigation bar in Huawei Hongmeng simulator

RS485接口的EMC设计方案

DIY制作示波器的超详细教程:(一)我不是为了做一个示波器

TL431-2.5v基准电压芯片几种基本用法

ADB command (install APK package format: ADB install APK address package name on the computer)

npm install错误 unable to access

Unity performance optimization ----- LOD (level of detail) of rendering optimization (GPU)
随机推荐
npm install错误 unable to access
USB接口电磁兼容(EMC)解决方案
Leetcode 244 week competition - post competition supplementary question solution [broccoli players]
基于FIFO IDT7202-12的数字存储示波器
Network equipment hard core technology insider router Chapter 17 dpdk and its prequel (II)
Cap theory and base theory
Distributed lock
TL431-2.5v基准电压芯片几种基本用法
Selenium 报错:session not created: This version of ChromeDriver only supports Chrome version 81
Unity performance optimization ----- drawcall
Notice on printing and distributing the Interim Measures for the administration of green manufacturing pilot demonstration of Shenzhen Bureau of industry and information technology
修改frameworks资源文件如何单编
MOS管防止电源反接的原理
《剑指Offer》两个链表的第一个公共结点
Network equipment hard core technology insider router Chapter 5 tompkinson roaming the network world (Part 1)
STM32 CAN 通信 滤波设置问题
Network equipment hard core technology insider router 19 dpdk (IV)
Unity3d learning note 10 - texture array
Kotlin的基础用法
Lua study notes