当前位置:网站首页>Optimization of aggregate mentioned at DATA AI Summit 2022
Optimization of aggregate mentioned at DATA AI Summit 2022
2022-07-31 00:14:00 【Hongnai riverside bird】
Background
This article is based on SPARK 3.3.0
Optimization of HashAggregate
This optimization is an internal optimization of FaceBook(Meta) and merged into the spark community.
The main part of this optimization is the partialaggregate part: for the aggregation operations such as count, sum, and Avg, there will be a partial aggregation operation performed by the mapper, and then the FinalAggregate operation will be performed on the reduce side.This seems to be no problem (it can reduce network IO very well), but we know that for aggregation operations, we will perform data spill operations. If the data merged in the mapper stage is so small that it cannot offset the network IO bandIf the consumption comes, this will undoubtedly bring loss to the task.



Use runtime metrics information, can achieve a better acceleration effect.
Optimization of ObjectHashAggregate
For the principle of ObjectHashAggregate, you can refer to for an in-depth understanding of HashAggregateExec and ObjectHashAggregateExec and UnsafeRow in SPARK SQL.This article can clearly explain the difference between ObjectHashAggregate and HashAggregate:
- ObjectHashAggregate can make up for HashAggregate's inability to support expressions such as collect_set, so it will not be converted to SortAggregate
- ObjectHashAggregate uses java Array object (SpecificInternalRow) to save the intermediate buffer of aggregation, which is not very friendly to jvm gc
- ObjectHashAggregate performs spill based on the size of the hashMap (128 by default), not the number of input rows, which will lead to early spill and low memory utilization.
- Due to the early spill, ObjectHashAggregate will perform an additional sorting operation on all the remaining data (if there is no spill, no additional sorting operation is required), while HashAggregate will sort the data that needs to be spilled each time
Use the memory usage of the jvm heap and the number of rows processed to guide when to start spill.
However, in the case of skewed data, it will increase the risk of OOM.
SortAggregate Optimization
The current status of SortAggreaget is:
- Each task needs to be sorted by key before sort Aggregate
- According to the sorted result, perform aggregation operation between adjacent rows
Different from Hash Aggregate: - There is no need for hashTable, so there is no memory overflow and fallback to sortAggregate
- Optimizer prefers to choose hashAggregate
- No codegen implementation.
Currently added features in spark 3.3.0:
- If the data is ordered, it will choose to use sortAggragate instead of HashAggregate
through the physical plan RuleReplaceHashWithSortAggto do the replacement, of course throughspark.sql.execution.replaceHashWithSortAggto turn it on (off by default), because for any new feature, it is turned off by default in the release version, and it is turned on in the master branch - Support codegen code generation for sortAggretate (without keys)
Other
For more details about Aggregate, please refer to sparksql source code series | One article to understand the execution principle of with one count distinct
边栏推荐
猜你喜欢
随机推荐
实验7(MPLS实验)
firewalld
常用的正则表达式
How to use joiplay emulator
firewalld
After writing business code for many years, I found these 11 doorways, which only experts know
Android security optimization - APP reinforcement
How to install joiplay emulator rtp
46.
Shell script if statement
如何在WordPress网站上添加导航菜单
Jetpack Compose学习(8)——State及remeber
flex-direction容器属性
2D Transform Module && Media Queries
借助深度估计的点云场景重建
边缘计算与小程序也能结合!智能家居是否能借势上台阶
软考总结
transition过渡&&animation动画
Ukraine's foreign ministry: wu was restored to complete the export of food security
Oracle一个诡异的临时表空间不足的问题


![45. [Application of list linked list]](/img/7a/ca026cafeceffd2daee68fe66e1882.png)





