当前位置:网站首页>Push down of spark filter operator on parquet file
Push down of spark filter operator on parquet file
2022-07-27 15:37:00 【wankunde】
List of articles
Spark The program is reading Parquet When you file, you can put the appropriate Filter The condition is converted to Parquet Of documents Filter, According to the document Footer The statistical information submission in filters out some that do not meet the conditions Block, Reduce data IO.
Preparing the test environment
- Spark Version : 3.0.0
- Parquet Version : 1.10.1
import spark.implicits._
import scala.util.Random
import scala.math.BigDecimal
import java.time.LocalDateTime
import java.sql.Timestamp
spark.range(1, 1000).map {
id =>
val id2 = id + Random.nextInt(10) - 5
val id3 = BigDecimal((id * 100 + Random.nextInt(100)) / 100.0)
val name = s"wankun_$id"
val isMan = id % 2 == 0
val birthday = Timestamp.valueOf(LocalDateTime.now.plusDays(id))
(id.toInt, id2, id3, name, isMan, birthday)
}.toDF("id", "id2", "id3", "name", "isMan", "birthday")
.createOrReplaceTempView("tab2")
spark.sql(
s"""create table tab (
| id1 int,
| id2 bigint,
| id3 decimal,
| name string,
| isMan boolean,
| birthday timestamp
|)
|stored as parquet;
|""".stripMargin)
spark.sql("insert overwrite table tab select * from tab2")
spark.sql("select * from tab where id1 =4 limit 10").show()
- Before starting the test , For the convenience of our observation filter stay parquet Push up and push down , Let's put the parquet The log level of is adjusted to INFO:
log4j.logger.org.apache.parquet=INFO - test Parquet file Footer Information :
ParquetMetaData{FileMetaData{schema: message spark_schema {
required int32 id1;
required int64 id2;
optional fixed_len_byte_array(5) id3 (DECIMAL(10,0));
optional binary name (UTF8);
required boolean isMan;
optional int96 birthday;
}
, metadata: {org.apache.spark.version=3.0.0, org.apache.spark.sql.parquet.row.metadata={"type":"struct","fields":[{"name":"id1","type":"integer","nullable":false,"metadata":{}},{"name":"id2","type":"long","nullable":false,"metadata":{}},{"name":"id3","type":"decimal(10,0)","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"isMan","type":"boolean","nullable":false,"metadata":{}},{"name":"birthday","type":"timestamp","nullable":true,"metadata":{}}]}}}, blocks: [BlockMetaData{999, 43182 [ColumnMetaData{SNAPPY [id1] required int32 id1 [BIT_PACKED, PLAIN], 4}, ColumnMetaData{SNAPPY [id2] required int64 id2 [BIT_PACKED, PLAIN_DICTIONARY], 4053}, ColumnMetaData{SNAPPY [id3] optional fixed_len_byte_array(5) id3 (DECIMAL(10,0)) [BIT_PACKED, PLAIN, RLE], 8096}, ColumnMetaData{SNAPPY [name] optional binary name (UTF8) [BIT_PACKED, PLAIN, RLE], 11254}, ColumnMetaData{SNAPPY [isMan] required boolean isMan [BIT_PACKED, PLAIN], 16305}, ColumnMetaData{SNAPPY [birthday] optional int96 birthday [BIT_PACKED, RLE, PLAIN_DICTIONARY], 16351}]}]}
Observe Filter Push down results on the file :
scala> spark.sql("select * from tab where id1 =4 limit 10").show()
2021-03-11 17:30:12,606 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
2021-03-11 17:30:12,809 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
2021-03-11 17:30:12,858 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id1, null), eq(id1, 4))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where id2 =4 limit 10").show()
2021-03-11 17:30:29,266 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
2021-03-11 17:30:29,281 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
spark.authenticate=false
2021-03-11 17:30:29,292 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(id2, null), eq(id2, 4))
+---+---+---+----+-----+--------+
|id1|id2|id3|name|isMan|birthday|
+---+---+---+----+-----+--------+
+---+---+---+----+-----+--------+
scala> spark.sql("select * from tab where id3 =4.0 limit 10").show()
2021-03-11 17:30:44,114 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
2021-03-11 17:30:44,130 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
2021-03-11 17:30:44,143 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: noteq(id3, null)
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 3| -1| 4|wankun_3|false|2021-03-14 15:03:...|
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where name ='wankun_6' limit 10").show()
2021-03-11 17:31:21,261 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
2021-03-11 17:31:21,277 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
2021-03-11 17:31:21,287 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(name, null), eq(name, Binary{
"wankun_6"}))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 6| 7| 6|wankun_6| true|2021-03-17 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where isMan =true limit 2").show()
2021-03-11 17:31:43,460 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
2021-03-11 17:31:43,474 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
2021-03-11 17:31:43,484 INFO org.apache.parquet.filter2.compat.FilterCompat: Filtering using predicate: and(noteq(isMan, null), eq(isMan, true))
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 2| -3| 3|wankun_2| true|2021-03-13 15:03:...|
| 4| 1| 4|wankun_4| true|2021-03-15 15:03:...|
+---+---+---+--------+-----+--------------------+
scala> spark.sql("select * from tab where birthday > current_timestamp() limit 2").show()
+---+---+---+--------+-----+--------------------+
|id1|id2|id3| name|isMan| birthday|
+---+---+---+--------+-----+--------------------+
| 1| -4| 1|wankun_1|false|2021-03-12 15:03:...|
| 2| -3| 3|wankun_2| true|2021-03-13 15:03:...|
+---+---+---+--------+-----+--------------------+
Source code walking instructions
from SparkPlan Of doExecute() Method entry to call ParquetFileFormat Conduct Parquet The stack of file reading is as follows :
SparkPlan.doExecute() // Abstract method
DataSourceScanExec.doExecute() // Actual object calls {
// Internal visits inputRDD, inputRDD Lazy Instantiation
lazy val inputRDD: RDD[InternalRow] = {
// readFile Functions are based on Plan The analytic result is relation.fileFormat To create reader
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = requiredSchema,
filters = pushedDownFilters, // there pushedDownFilters yes Spark Remaining after optimizer optimization filter
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
val readRDD =
createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation)
readRDD
}
}
ParquetFileFormat yes FileFormat Subclasses of , Read parquet When you file , Look directly at ParquetFileFormat Of buildReaderWithPartitionValues Method realization
override def buildReaderWithPartitionValues(
sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
// The following is a long and long section of setting reading Parquet Related parameters , Independent main program logic , But they are all useful , You can have a look when you need it
// ParquetReadSupport Do some spark and parquet Of shcema Convert the work of data type conversion
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)
......
// spark.sql.parquet.int96TimestampConversion This parameter is only used for compatibility conversion of data types , Can't do Filter Push down type conversion
val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion
// This is the return object of the method , According to the created file, return reader The iterator
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
// all Parquet File read , All packaged into one ParquetInputSplit To operate
val filePath = new Path(new URI(file.filePath))
val split =
new org.apache.parquet.hadoop.ParquetInputSplit(
filePath,
file.start,
file.start + file.length,
file.length,
Array.empty,
null)
val sharedConf = broadcastedHadoopConf.value.value
lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
// Try here to SparkPlan Of Filter Convert to Parquet Of Filter,
// 1. parquetFilters It can control whether it is right date,timestamp And other types
// 2. image timestamp In the current version parquet The storage format in is INT96, Conversion is not yet supported
// 3. The last remaining filter Is to push down to parquet On the paper filter
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
} else {
None
}
......
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
// Try to push down filters when filter push-down is enabled.
// Notice: This push-down is RowGroups level, not individual records. // Pay attention , Here is RowGroups Push down , Not a row level push down ~~
if (pushed.isDefined) {
// If there are files that can be pushed down filter, Set it directly to hadoop config Parameters in :
// FILTER_PREDICATE = "parquet.private.read.filter.predicate"
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
// Here to create reader, And will reader Encapsulated as a regular iterator, Nothing special
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull,
datetimeRebaseMode.toString,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
iter.asInstanceOf[Iterator[InternalRow]]
} else {
logDebug(s"Falling back to parquet-mr")
// ParquetRecordReader returns InternalRow
val readSupport = new ParquetReadSupport(
convertTz, enableVectorizedReader = false, datetimeRebaseMode)
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
} else {
new ParquetRecordReader[InternalRow](readSupport)
}
val iter = new RecordReaderIterator[InternalRow](reader)
// SPARK-23457 Register a task completion listener before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
reader.initialize(split, hadoopAttemptContext)
val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
if (partitionSchema.length == 0) {
// There is no partition columns
iter.map(unsafeProjection)
} else {
val joinedRow = new JoinedRow()
iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
}
}
}
}
Read the instructions :
- Notice: This push-down is RowGroups level, not individual records. // Pay attention , Here is RowGroups Push down , Not a row level push down ~~
- The program will Filter Push down information to Hadoop Configuration Object's
parquet.private.read.filter.predicateProperties of the , However, the attribute value is a process gzip Compression of the byte Array . When we were checking , You can see another attribute :configuration.get("parquet.private.read.filter.predicate.human.readable") = "and(noteq(id1, null), eq(id1, 4))". Reference code : org.apache.parquet.hadoop.ParquetInputFormat OfsetFilterPredicate()andgetFilterPredicate()function - With SQL Medium filter condition
id1 = 4For example , Finally generated Parquet Of documents Filter as follows :
filter = {[email protected]}
filterPredicate = {[email protected]} "and(noteq(id1, null), eq(id1, 4))"
left = {[email protected]} "noteq(id1, null)"
right = {[email protected]} "eq(id1, 4)"
toString = "and(noteq(id1, null), eq(id1, 4))"
- int, long, string, boolean Type of support , decimal , timestamp Type does not support (timestamp By the current version parquet Stored as INT96 type , In the current code base , Push down of this value type is not supported for the time being )

边栏推荐
- Spark3中Catalog组件设计和自定义扩展Catalog实现
- Jump to the specified position when video continues playing
- 使用Lombok导致打印的tostring中缺少父类的属性
- 股票开户佣金优惠,炒股开户哪家证券公司好网上开户安全吗
- Static关键字的三种用法
- JS uses unary operators to simplify string to number conversion
- multimap案例
- Spark 本地程序启动缓慢问题排查
- Overview of wechat public platform development
- C语言:扫雷小游戏
猜你喜欢

QT (five) meta object properties
![[TensorBoard] OSError: [Errno 22] Invalid argument处理](/img/bf/c995f487607e3b307a268779ec1e94.png)
[TensorBoard] OSError: [Errno 22] Invalid argument处理

复杂度分析

实体类(VO,DO,DTO)的划分

【剑指offer】面试题54:二叉搜索树的第k大节点

Fluent -- layout principle and constraints

数组名是首元素地址吗?

After configuring corswebfilter in grain mall, an error is reported: resource sharing error:multiplealloworiginvalues

QT (IV) mixed development using code and UI files
![[0 basic operations research] [super detail] column generation](/img/cd/f2521824c9ef6a50ec2be307c584ca.png)
[0 basic operations research] [super detail] column generation
随机推荐
【剑指offer】面试题49:丑数
Multi table query_ Exercise 1 & Exercise 2 & Exercise 3
Leetcode interview question 17.21. water volume double pointer of histogram, monotonic stack /hard
After configuring corswebfilter in grain mall, an error is reported: resource sharing error:multiplealloworiginvalues
$router.back(-1)
Spark动态资源分配的资源释放过程及BlockManager清理过程
With just two modifications, apple gave styleganv2 3D generation capabilities
使用双星号代替Math.pow()
【剑指offer】面试题46:把数字翻译成字符串——动态规划
【剑指offer】面试题42:连续子数组的最大和——附0x80000000与INT_MIN
How to package AssetBundle
Dan bin Investment Summit: on the importance of asset management!
Some binary bit operations
QT (five) meta object properties
Summer Challenge harmonyos realizes a hand-painted board
Method of removing top navigation bar in Huawei Hongmeng simulator
C:什么是函数中的返回值(转)
Cap theory and base theory
Fluent -- layout principle and constraints
js寻找数组中的最大和最小值(Math.max()方法)