当前位置:网站首页>Rebalance operation in spark and its difference from repartition operation
Rebalance operation in spark and its difference from repartition operation
2022-07-04 17:01:00 【Hongnai riverside bird】
background
This article is basically spark 3.2.1
stay Partitioning Hints Types Mentioned in Rebalance Operation and Repartition operation , And they can do data repartition , What's the difference between them ?
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
analysis
- Rebalance
Refer to the corresponding SPARK-35725, Its purpose is to be in AQE Stage , according tospark.sql.adaptive.advisoryPartitionSizeInBytes
Repartition of partitions , Prevent data skew . Plus SPARK-35786, It can be based on hint Re zoning .
Let's see how to achieve it ,OptimizeSkewInRebalancePartitions The code is as follows :
override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
...
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
return plan
}
plan match {
case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
tryOptimizeSkewedPartitions(stage)
case _ => plan
}
}
It's only on spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
In the case of , Can be partitioned expand, And you have to shuffle The source of is REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL This rule can be applied only when .tryOptimizeSkewedPartitions
The specific implementation of can be seen in the code , The comments of the code are very clear :
* We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
* Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
* the map side looks like:
* m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
* and the reduce side looks like:
* (without this rule) r1[m0-b1, m1-b1, m2-b1]
* / \
* r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
*
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.
Let's analyze REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL source :
This is ResolveHints In the rule :
private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
hint.parameters match {
case partitionExprs @ Seq(_*) =>
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
if (invalidParams.nonEmpty) {
val hintName = hint.name.toUpperCase(Locale.ROOT)
throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams)
}
RebalancePartitions(partitionExprs.map(_.asInstanceOf[Expression]), hint.child)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_HINT), ruleId) {
case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match {
case "REPARTITION" =>
createRepartition(shuffle = true, hint)
case "COALESCE" =>
createRepartition(shuffle = false, hint)
case "REPARTITION_BY_RANGE" =>
createRepartitionByRange(hint)
case "REBALANCE" if conf.adaptiveExecutionEnabled =>
createRebalance(hint)
case _ => hint
}
}
It can be seen that only in AQE When on The Rebalance Of hint To take effect , Generate corresponding RebalancePartitions Logical plan , And the logical plan will be in BasicOperators In the rules , convert to ShuffleEchangeExec Physical plan :
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else {
REBALANCE_PARTITIONS_BY_COL
}
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
Because only shuffle During operation ,AQE Phase will be applied to OptimizeSkewInRebalancePartitions
The rules , Only in this way can we be in shuffle read Stage basis shuffle write Optimize the data of stage .
Be careful :
among OptimizeShuffleWithLocalRead Do not apply shuffleOrigin by REBALANCE_PARTITIONS_BY_COL Of , Otherwise, there is a problem of small files in the dynamic partition , Specific view Discussion here
- Repartition
be relative to Rebalance, The hint Just partition according to the specified fixed partition data or columns , At this time, the size of each partition cannot be controlled , It can only be said that it is distributed evenly or by column hash Partition ( In this case, there are different file sizes )
Specific analysis , You can refer to Rebalance Analysis of .
One thing to note is in SPARK-35650 after ,Repartition The operation is also in AQE Stage to optimize , And in the SPARK-35725 after , If it's simple REPARTITION hint It can also achieve Rebalace hint The effect of , Because in here hold shuffleOrigin from REPARTITION_BY_NONE Changed to REBALANCE_PARTITIONS_BY_NONE 了 , So it can also be used in OptimizeSkewInRebalancePartitions The rules .
Conclusion
Generally in reparition It can be used anywhere Rebalance To replace , and Rebalance Better file size control , For more information, please check the corresponding spark-jira
边栏推荐
- Position encoding practice in transformer
- [glide] cache implementation - memory and disk cache
- Blood spitting finishing nanny level series tutorial - play Fiddler bag grabbing tutorial (2) - first meet fiddler, let you have a rational understanding
- [Acwing] 58周赛 4490. 染色
- The winning rate against people is 84%, and deepmind AI has reached the level of human experts in army chess for the first time
- C# 实现 FFT 正反变换 和 频域滤波
- "Cannot initialize Photoshop because the temporary storage disk is full" graphic solution
- Congratulations to Mr. Zhang Pengfei, chief data scientist of artefact, for winning the campaign Asia tech MVP 2022
- Configuration instance of Oracle listener server and client
- Accounting regulations and professional ethics [6]
猜你喜欢
Transformer中position encoding实践
电子元器件B2B商城系统开发:赋能企业构建进销存标准化流程实例
Object. Usage of keys()
多年锤炼,迈向Kata 3.0 !走进开箱即用的安全容器体验之旅| 龙蜥技术
NoSQL之readis配置与优化(终章)
Application of clock wheel in RPC
Principle and general steps of SQL injection
新的职业已经出现,怎么能够停滞不前 ,人社部公布建筑新职业
昆明三环闭合工程将经过这些地方,有在你家附近的吗?
"Cannot initialize Photoshop because the temporary storage disk is full" graphic solution
随机推荐
Accounting regulations and professional ethics [10]
Principle and general steps of SQL injection
Hair and fuzz interceptor Industry Research Report - market status analysis and development prospect forecast
Start by counting
Use and principle of thread pool
Accounting regulations and professional ethics [11]
ECCV 2022 released: 1629 papers were selected, and the employment rate was less than 20%
Market trend report, technical innovation and market forecast of taillight components in China
[Chongqing Guangdong education] National Open University spring 2019 1248 public sector human resource management reference questions
Object.keys()的用法
Embedded software architecture design - function call
Accounting regulations and professional ethics [9]
Redis: SDS source code analysis
建筑建材行业经销商协同系统解决方案:赋能企业构建核心竞争力
Accounting regulations and professional ethics [6]
Height residual method
Research Report on plastic recycling machine industry - market status analysis and development prospect forecast
Daily notes~
Cut! 39 year old Ali P9, saved 150million
【Go ~ 0到1 】 第六天 文件的读写与创建