当前位置:网站首页>Rebalance operation in spark and its difference from repartition operation
Rebalance operation in spark and its difference from repartition operation
2022-07-04 17:01:00 【Hongnai riverside bird】
background
This article is basically spark 3.2.1
stay Partitioning Hints Types Mentioned in Rebalance Operation and Repartition operation , And they can do data repartition , What's the difference between them ?
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
analysis
- Rebalance
Refer to the corresponding SPARK-35725, Its purpose is to be in AQE Stage , according tospark.sql.adaptive.advisoryPartitionSizeInBytes
Repartition of partitions , Prevent data skew . Plus SPARK-35786, It can be based on hint Re zoning .
Let's see how to achieve it ,OptimizeSkewInRebalancePartitions The code is as follows :
override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
...
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
return plan
}
plan match {
case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
tryOptimizeSkewedPartitions(stage)
case _ => plan
}
}
It's only on spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
In the case of , Can be partitioned expand, And you have to shuffle The source of is REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL This rule can be applied only when .tryOptimizeSkewedPartitions
The specific implementation of can be seen in the code , The comments of the code are very clear :
* We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
* Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
* the map side looks like:
* m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
* and the reduce side looks like:
* (without this rule) r1[m0-b1, m1-b1, m2-b1]
* / \
* r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
*
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.
Let's analyze REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL source :
This is ResolveHints In the rule :
private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
hint.parameters match {
case partitionExprs @ Seq(_*) =>
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
if (invalidParams.nonEmpty) {
val hintName = hint.name.toUpperCase(Locale.ROOT)
throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams)
}
RebalancePartitions(partitionExprs.map(_.asInstanceOf[Expression]), hint.child)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_HINT), ruleId) {
case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match {
case "REPARTITION" =>
createRepartition(shuffle = true, hint)
case "COALESCE" =>
createRepartition(shuffle = false, hint)
case "REPARTITION_BY_RANGE" =>
createRepartitionByRange(hint)
case "REBALANCE" if conf.adaptiveExecutionEnabled =>
createRebalance(hint)
case _ => hint
}
}
It can be seen that only in AQE When on The Rebalance Of hint To take effect , Generate corresponding RebalancePartitions Logical plan , And the logical plan will be in BasicOperators In the rules , convert to ShuffleEchangeExec Physical plan :
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else {
REBALANCE_PARTITIONS_BY_COL
}
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
Because only shuffle During operation ,AQE Phase will be applied to OptimizeSkewInRebalancePartitions
The rules , Only in this way can we be in shuffle read Stage basis shuffle write Optimize the data of stage .
Be careful :
among OptimizeShuffleWithLocalRead Do not apply shuffleOrigin by REBALANCE_PARTITIONS_BY_COL Of , Otherwise, there is a problem of small files in the dynamic partition , Specific view Discussion here
- Repartition
be relative to Rebalance, The hint Just partition according to the specified fixed partition data or columns , At this time, the size of each partition cannot be controlled , It can only be said that it is distributed evenly or by column hash Partition ( In this case, there are different file sizes )
Specific analysis , You can refer to Rebalance Analysis of .
One thing to note is in SPARK-35650 after ,Repartition The operation is also in AQE Stage to optimize , And in the SPARK-35725 after , If it's simple REPARTITION hint It can also achieve Rebalace hint The effect of , Because in here hold shuffleOrigin from REPARTITION_BY_NONE Changed to REBALANCE_PARTITIONS_BY_NONE 了 , So it can also be used in OptimizeSkewInRebalancePartitions The rules .
Conclusion
Generally in reparition It can be used anywhere Rebalance To replace , and Rebalance Better file size control , For more information, please check the corresponding spark-jira
边栏推荐
- China's roof ladder market trend report, technological innovation and market forecast
- 科普达人丨一文看懂阿里云的秘密武器“神龙架构”
- Research Report of exoskeleton robot industry - market status analysis and development prospect prediction
- overflow:auto与felx结合的用法
- Research Report on market supply and demand and strategy of China's four sided flat bag industry
- 2022年国内云管平台厂商哪家好?为什么?
- D3D11_ Chili_ Tutorial (2): draw a triangle
- Jump table instance
- Why do you say that the maximum single table of MySQL database is 20million? Based on what?
- Blood spitting finishing nanny level series tutorial - play Fiddler bag grabbing tutorial (2) - first meet fiddler, let you have a rational understanding
猜你喜欢
Understand asp Net core - Authentication Based on jwtbearer
"Cannot initialize Photoshop because the temporary storage disk is full" graphic solution
多年锤炼,迈向Kata 3.0 !走进开箱即用的安全容器体验之旅| 龙蜥技术
《吐血整理》保姆级系列教程-玩转Fiddler抓包教程(2)-初识Fiddler让你理性认识一下
~88 running people practice
Software Engineer vs Hardware Engineer
The winning rate against people is 84%, and deepmind AI has reached the level of human experts in army chess for the first time
Yanwen logistics plans to be listed on Shenzhen Stock Exchange: it is mainly engaged in international express business, and its gross profit margin is far lower than the industry level
【Go ~ 0到1 】 第六天 文件的读写与创建
D3D11_ Chili_ Tutorial (2): draw a triangle
随机推荐
同构图与异构图CYPHER-TASK设计与TASK锁机制
线性时间排序
Accounting regulations and professional ethics [8]
PyTorch深度学习快速入门教程
Overflow: the combination of auto and Felx
电子元器件B2B商城系统开发:赋能企业构建进销存标准化流程实例
【Go ~ 0到1 】 第六天 文件的读写与创建
VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题
[Chongqing Guangdong education] National Open University spring 2019 1248 public sector human resource management reference questions
表单传递时,如何隐式将值传过去
最大子数组与矩阵乘法
Market trend report, technical innovation and market forecast of taillight components in China
散列表
Research Report on market supply and demand and strategy of China's four sided flat bag industry
ECCV 2022放榜了:1629篇论文中选,录用率不到20%
手里10万元存款买什么理财产品收益最高?
Practice: fabric user certificate revocation operation process
How to contribute to the source code of ongdb core project
话里话外:流程图绘制初级:六大常见错误
Inside and outside: flow chart drawing elementary: six common mistakes