当前位置:网站首页>Push down of spark filter operator on parquet file
Push down of spark filter operator on parquet file
2022-07-27 15:37:00 【wankunde】
List of articles
Spark The program is reading Parquet When you file, you can put the appropriate Filter The condition is converted to Parquet Of documents Filter, According to the document Footer The statistical information submission in filters out some that do not meet the conditions Block, Reduce data IO.
Preparing the test environment
- Spark Version : 3.0.0
- Parquet Version : 1.10.1
import spark.implicits._
import scala.util.Random
import scala.math.BigDecimal
import java.time.LocalDateTime
import java.sql.Timestamp
spark.range(1, 1000).map {
id =>
val id2 = id + Random.nextInt(10) - 5
val id3 = BigDecimal((id * 100 + Random.nextInt(100)) / 100.0)
val name = s"wankun_$id"
val isMan = id % 2 == 0
val birthday = Timestamp.valueOf(LocalDateTime.now.plusDays(id))
(id.toInt, id2, id3, name, isMan, birthday)
}.toDF("id", "id2", "id3", "name", "isMan", "birthday")
.createOrReplaceTempView("tab2")
spark.sql(
s"""create table tab (
| id1 int,
| id2 bigint,
| id3 decimal,
| name string,
| isMan boolean,
| birthday timestamp
|)
|stored as parquet;
|""".stripMargin)
spark.sql("insert overwrite table tab select * from tab2")
spark.sql("select * from tab where id1 =4 limit 10").show()
- Before starting the test , For the convenience of our observation filter stay parquet Push up and push down , Let's put the parquet The log level of is adjusted to INFO:
log4j.logger.org.apache.parquet=INFO - test Parquet file Footer Information :
ParquetMetaData{FileMetaData{schema: message spark_schema {
required int32 id1;
required int64 id2;
optional fixed_len_byte_array(5) id3 (DECIMAL(10,0));
optional binary name (UTF8);
required boolean isMan;
optional int96 birthday;
}
, metadata: {org.apache.spark.version=3.0.0, org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id1","type":"integer","nullable":false,"metadata":{}},{"name":"id2","type":"long","nullable":false,"metadata":{}},{"name":"id3","type":"decimal(10,0)","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"isMan","type":"boolean","nullable":false,"metadata":{}},{"name":"birthday","type":"timestamp","nullable":true,"metadata":{}}]}}}, blocks: [BlockMetaData{999, 43182 [ColumnMetaData{SNAPPY [id1] required int32 id1 [BIT_PACKED, PLAIN], 4}, ColumnMetaData{SNAPPY [id2] required int64 id2 [BIT_PACKED, PLAIN_DICTIONARY], 4053}, ColumnMetaData{SNAPPY [id3] optional fixed_len_byte_array(5) id3 (DECIMAL(10,0)) [BIT_PACKED, PLAIN, RLE], 8096}, ColumnMetaData{SNAPPY [name] optional binary name (UTF8) [BIT_PACKED, PLAIN, RLE], 11254}, ColumnMetaData{SNAPPY [isMan] required boolean isMan [BIT_PACKED, PLAIN], 16305}, ColumnMetaData{SNAPPY [birthday] optional int96 birthday [BIT_PACKED, RLE, PLAIN_DICTIONARY], 16351}]}]}
Observe Filter Push down results on the file :
scala> spark.sql("select * from tab where id1 =4 limit 10").show()
2021-03-11 17:30:12,606 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
2021-03-11 17:30:12,809 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
2021-03-11 17:30:12,858 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where id2 =4 limit 10").show()
2021-03-11 17:30:29,266 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
2021-03-11 17:30:29,281 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
spark.authenticate=false
2021-03-11 17:30:29,292 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
+---+---+---+----+-----+--------+
|id1|id2|id3|name|isMan|birthday|
+---+---+---+----+-----+--------+
+---+---+---+----+-----+--------+
scala> spark.sql("select * from tab where id3 =4.0 limit 10").show()
2021-03-11 17:30:44,114 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
2021-03-11 17:30:44,130 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
2021-03-11 17:30:44,143 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 3| -1| 4|wankun_3|false|2021-03-14 15:03:...|
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where name ='wankun_6' limit 10").show()
2021-03-11 17:31:21,261 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
2021-03-11 17:31:21,277 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
2021-03-11 17:31:21,287 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 6| 7| 6|wankun_6| true|2021-03-17 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where isMan =true limit 2").show()
2021-03-11 17:31:43,460 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
2021-03-11 17:31:43,474 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
2021-03-11 17:31:43,484 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 2| -3| 3|wankun_2| true|2021-03-13 15:03:...|
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where birthday > current_timestamp() limit 2").show()
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 1| -4| 1|wankun_1|false|2021-03-12 15:03:...|
| 2| -3| 3|wankun_2| true|2021-03-13 15:03:...|
+---+---+---+--------+-----+--------------------+
Source code walking instructions
from SparkPlan Of doExecute() Method entry to call ParquetFileFormat Conduct Parquet The stack of file reading is as follows :
SparkPlan.doExecute() // Abstract method
DataSourceScanExec.doExecute() // Actual object calls {
// Internal visits inputRDD, inputRDD Lazy Instantiation
lazy val inputRDD: RDD[InternalRow] = {
// readFile Functions are based on Plan The analytic result is relation.fileFormat To create reader
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters, // there pushedDownFilters yes Spark Remaining after optimizer optimization filter
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
val readRDD =
createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation)
readRDD
}
}
ParquetFileFormat yes FileFormat Subclasses of , Read parquet When you file , Look directly at ParquetFileFormat Of buildReaderWithPartitionValues Method realization
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
// The following is a long and long section of setting reading Parquet Related parameters , Independent main program logic , But they are all useful , You can have a look when you need it
// ParquetReadSupport Do some spark and parquet Of shcema Convert the work of data type conversion
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
......
// spark.sql.parquet.int96TimestampConversion This parameter is only used for compatibility conversion of data types , Can't do Filter Push down type conversion
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
// This is the return object of the method , According to the created file, return reader The iterator
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
// all Parquet File read , All packaged into one ParquetInputSplit To operate
val filePath = new Path(new URI(file.filePath))
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
file.start,
file.start + file.length,
file.length,
Array.empty,
null)
val sharedConf = broadcastedHadoopConf.value.value
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
// Try here to SparkPlan Of Filter Convert to Parquet Of Filter,
// 1. parquetFilters It can control whether it is right date,timestamp And other types
// 2. image timestamp In the current version parquet The storage format in is INT96, Conversion is not yet supported
// 3. The last remaining filter Is to push down to parquet On the paper filter
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
} else {
None
}
......
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records. // Pay attention , Here is RowGroups Push down , Not a row level push down ~~
if (pushed.isDefined) {
// If there are files that can be pushed down filter, Set it directly to hadoop config Parameters in :
// FILTER_PREDICATE = "parquet.private.read.filter.predicate"
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
// Here to create reader, And will reader Encapsulated as a regular iterator, Nothing special
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz, enableVectorizedReader = false, datetimeRebaseMode)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
}
Read the instructions :
- Notice: This push-down is RowGroups level, not individual records. // Pay attention , Here is RowGroups Push down , Not a row level push down ~~
- The program will Filter Push down information to Hadoop Configuration Object's
parquet.private.read.filter.predicateProperties of the , However, the attribute value is a process gzip Compression of the byte Array . When we were checking , You can see another attribute :configuration.get("parquet.private.read.filter.predicate.human.readable") = "and(noteq(id1, null), eq(id1, 4))". Reference code : org.apache.parquet.hadoop.ParquetInputFormat OfsetFilterPredicate()andgetFilterPredicate()function - With SQL Medium filter condition
id1 = 4For example , Finally generated Parquet Of documents Filter as follows :
filter = {[email protected]}
filterPredicate = {[email protected]} "and(noteq(id1, null), eq(id1, 4))"
left = {[email protected]} "noteq(id1, null)"
right = {[email protected]} "eq(id1, 4)"
toString = "and(noteq(id1, null), eq(id1, 4))"
- int, long, string, boolean Type of support , decimal , timestamp Type does not support (timestamp By the current version parquet Stored as INT96 type , In the current code base , Push down of this value type is not supported for the time being )

边栏推荐
- [daily question 1] 558. Intersection of quadtrees
- 折半插入排序
- JUC(JMM、Volatile)
- Spark3中Catalog组件设计和自定义扩展Catalog实现
- Network equipment hard core technology insider router Chapter 21 reconfigurable router
- Leetcode 90. subset II backtracking /medium
- Network equipment hard core technology insider router Chapter 16 dpdk and its prequel (I)
- Network equipment hard core technology insider router chapter Cisco asr9900 disassembly (I)
- 【剑指offer】面试题51:数组中的逆序对——归并排序
- 复杂度分析
猜你喜欢

Four kinds of relay schemes driven by single chip microcomputer

Google team launches new transformer to optimize panoramic segmentation scheme CVPR 2022

Leetcode-1737- minimum number of characters to change if one of the three conditions is met

Leetcode 74. search two-dimensional matrix bisection /medium

C语言:三子棋游戏

Leetcode 190. reverse binary bit operation /easy

Pictures to be delivered

Fluent -- layout principle and constraints

Static关键字的三种用法

【剑指offer】面试题45:把数组排成最小的数
随机推荐
【剑指offer】面试题52:两个链表的第一个公共节点——栈、哈希表、双指针
Huawei's general card identification function enables multiple card bindings with one key
C语言:动态内存函数
[daily question 1] 558. Intersection of quadtrees
Adaptation verification new occupation is coming! Huayun data participated in the preparation of the national vocational skill standard for information system adaptation verifiers
C语言:自定义类型
C:什么是函数中的返回值(转)
QT (IV) mixed development using code and UI files
Discussion on STM32 power down reset PDR
NPM install error unable to access
leetcode-1:两数之和
设置提示框位置随鼠标移动,并解决提示框显示不全的问题
Spark TroubleShooting整理
C language: factorial recursive implementation of numbers
【剑指offer】面试题53-Ⅱ:0~n-1中缺失的数字——二分查找
Leetcode 74. search two-dimensional matrix bisection /medium
Complexity analysis
Summer Challenge harmonyos realizes a hand-painted board
Spark 任务Task调度异常分析
How to package AssetBundle