当前位置:网站首页>Rebalance operation in spark and its difference from repartition operation
Rebalance operation in spark and its difference from repartition operation
2022-07-04 17:01:00 【Hongnai riverside bird】
background
This article is basically spark 3.2.1
stay Partitioning Hints Types Mentioned in Rebalance Operation and Repartition operation , And they can do data repartition , What's the difference between them ?
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
analysis
- Rebalance
Refer to the corresponding SPARK-35725, Its purpose is to be in AQE Stage , according tospark.sql.adaptive.advisoryPartitionSizeInBytes
Repartition of partitions , Prevent data skew . Plus SPARK-35786, It can be based on hint Re zoning .
Let's see how to achieve it ,OptimizeSkewInRebalancePartitions The code is as follows :
override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
...
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
return plan
}
plan match {
case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
tryOptimizeSkewedPartitions(stage)
case _ => plan
}
}
It's only on spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
In the case of , Can be partitioned expand, And you have to shuffle The source of is REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL This rule can be applied only when .tryOptimizeSkewedPartitions
The specific implementation of can be seen in the code , The comments of the code are very clear :
* We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
* Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
* the map side looks like:
* m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
* and the reduce side looks like:
* (without this rule) r1[m0-b1, m1-b1, m2-b1]
* / \
* r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
*
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.
Let's analyze REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL source :
This is ResolveHints In the rule :
private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
hint.parameters match {
case partitionExprs @ Seq(_*) =>
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
if (invalidParams.nonEmpty) {
val hintName = hint.name.toUpperCase(Locale.ROOT)
throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams)
}
RebalancePartitions(partitionExprs.map(_.asInstanceOf[Expression]), hint.child)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_HINT), ruleId) {
case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match {
case "REPARTITION" =>
createRepartition(shuffle = true, hint)
case "COALESCE" =>
createRepartition(shuffle = false, hint)
case "REPARTITION_BY_RANGE" =>
createRepartitionByRange(hint)
case "REBALANCE" if conf.adaptiveExecutionEnabled =>
createRebalance(hint)
case _ => hint
}
}
It can be seen that only in AQE When on The Rebalance Of hint To take effect , Generate corresponding RebalancePartitions Logical plan , And the logical plan will be in BasicOperators In the rules , convert to ShuffleEchangeExec Physical plan :
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else {
REBALANCE_PARTITIONS_BY_COL
}
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
Because only shuffle During operation ,AQE Phase will be applied to OptimizeSkewInRebalancePartitions
The rules , Only in this way can we be in shuffle read Stage basis shuffle write Optimize the data of stage .
Be careful :
among OptimizeShuffleWithLocalRead Do not apply shuffleOrigin by REBALANCE_PARTITIONS_BY_COL Of , Otherwise, there is a problem of small files in the dynamic partition , Specific view Discussion here
- Repartition
be relative to Rebalance, The hint Just partition according to the specified fixed partition data or columns , At this time, the size of each partition cannot be controlled , It can only be said that it is distributed evenly or by column hash Partition ( In this case, there are different file sizes )
Specific analysis , You can refer to Rebalance Analysis of .
One thing to note is in SPARK-35650 after ,Repartition The operation is also in AQE Stage to optimize , And in the SPARK-35725 after , If it's simple REPARTITION hint It can also achieve Rebalace hint The effect of , Because in here hold shuffleOrigin from REPARTITION_BY_NONE Changed to REBALANCE_PARTITIONS_BY_NONE 了 , So it can also be used in OptimizeSkewInRebalancePartitions The rules .
Conclusion
Generally in reparition It can be used anywhere Rebalance To replace , and Rebalance Better file size control , For more information, please check the corresponding spark-jira
边栏推荐
- 高度剩余法
- Object. Usage of keys()
- tp配置多数据库
- Statistical learning: logistic regression and cross entropy loss (pytoch Implementation)
- 安信证券排名 网上开户安全吗
- Research Report on market supply and demand and strategy of China's Sodium Tetraphenylborate (cas+143-66-8) industry
- Maximum subarray and matrix multiplication
- Cut! 39 year old Ali P9, saved 150million
- Accounting regulations and professional ethics [6]
- Accounting regulations and professional ethics [9]
猜你喜欢
C# 更加优质的操作MongoDB数据库
~88 running people practice
Congratulations to Mr. Zhang Pengfei, chief data scientist of artefact, for winning the campaign Asia tech MVP 2022
DC-2靶场搭建及渗透实战详细过程(DC靶场系列)
对人胜率84%,DeepMind AI首次在西洋陆军棋中达到人类专家水平
The winning rate against people is 84%, and deepmind AI has reached the level of human experts in army chess for the first time
D3D11_ Chili_ Tutorial (2): draw a triangle
Application of clock wheel in RPC
Readis configuration and optimization of NoSQL (final chapter)
Understand Alibaba cloud's secret weapon "dragon architecture" in the article "science popularization talent"
随机推荐
S2b2b solution for lighting industry: efficiently enable the industrial supply chain and improve the economic benefits of enterprises
手里10万元存款买什么理财产品收益最高?
System.currentTimeMillis() 和 System.nanoTime() 哪个更快?别用错了!
How to decrypt worksheet protection password in Excel file
C# 服务器日志模块
How to contribute to the source code of ongdb core project
线性时间排序
Hair and fuzz interceptor Industry Research Report - market status analysis and development prospect forecast
go-micro教程 — 第二章 go-micro v3 使用Gin、Etcd
C # realizes FFT forward and inverse transformation and frequency domain filtering
Firebird experience summary
[North Asia data recovery] a database data recovery case where the disk on which the database is located is unrecognized due to the RAID disk failure of HP DL380 server
新的职业已经出现,怎么能够停滞不前 ,人社部公布建筑新职业
Cut! 39 year old Ali P9, saved 150million
话里话外:流程图绘制初级:六大常见错误
Linear time sequencing
Visual Studio 2019 (LocalDB)MSSQLLocalDB SQL Server 2014 数据库版本为852无法打开,此服务器支持782
最大子数组与矩阵乘法
Accounting regulations and professional ethics [7]
一图看懂ThreadLocal