当前位置:网站首页>Rebalance operation in spark and its difference from repartition operation
Rebalance operation in spark and its difference from repartition operation
2022-07-04 17:01:00 【Hongnai riverside bird】
background
This article is basically spark 3.2.1
stay Partitioning Hints Types Mentioned in Rebalance Operation and Repartition operation , And they can do data repartition , What's the difference between them ?
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
analysis
- Rebalance
Refer to the corresponding SPARK-35725, Its purpose is to be in AQE Stage , according tospark.sql.adaptive.advisoryPartitionSizeInBytes
Repartition of partitions , Prevent data skew . Plus SPARK-35786, It can be based on hint Re zoning .
Let's see how to achieve it ,OptimizeSkewInRebalancePartitions The code is as follows :
override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
...
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
return plan
}
plan match {
case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
tryOptimizeSkewedPartitions(stage)
case _ => plan
}
}
It's only on spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
In the case of , Can be partitioned expand, And you have to shuffle The source of is REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL This rule can be applied only when .tryOptimizeSkewedPartitions
The specific implementation of can be seen in the code , The comments of the code are very clear :
* We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
* Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
* the map side looks like:
* m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
* and the reduce side looks like:
* (without this rule) r1[m0-b1, m1-b1, m2-b1]
* / \
* r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
*
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.
Let's analyze REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL source :
This is ResolveHints In the rule :
private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
hint.parameters match {
case partitionExprs @ Seq(_*) =>
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
if (invalidParams.nonEmpty) {
val hintName = hint.name.toUpperCase(Locale.ROOT)
throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams)
}
RebalancePartitions(partitionExprs.map(_.asInstanceOf[Expression]), hint.child)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_HINT), ruleId) {
case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match {
case "REPARTITION" =>
createRepartition(shuffle = true, hint)
case "COALESCE" =>
createRepartition(shuffle = false, hint)
case "REPARTITION_BY_RANGE" =>
createRepartitionByRange(hint)
case "REBALANCE" if conf.adaptiveExecutionEnabled =>
createRebalance(hint)
case _ => hint
}
}
It can be seen that only in AQE When on The Rebalance Of hint To take effect , Generate corresponding RebalancePartitions Logical plan , And the logical plan will be in BasicOperators In the rules , convert to ShuffleEchangeExec Physical plan :
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else {
REBALANCE_PARTITIONS_BY_COL
}
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
Because only shuffle During operation ,AQE Phase will be applied to OptimizeSkewInRebalancePartitions
The rules , Only in this way can we be in shuffle read Stage basis shuffle write Optimize the data of stage .
Be careful :
among OptimizeShuffleWithLocalRead Do not apply shuffleOrigin by REBALANCE_PARTITIONS_BY_COL Of , Otherwise, there is a problem of small files in the dynamic partition , Specific view Discussion here
- Repartition
be relative to Rebalance, The hint Just partition according to the specified fixed partition data or columns , At this time, the size of each partition cannot be controlled , It can only be said that it is distributed evenly or by column hash Partition ( In this case, there are different file sizes )
Specific analysis , You can refer to Rebalance Analysis of .
One thing to note is in SPARK-35650 after ,Repartition The operation is also in AQE Stage to optimize , And in the SPARK-35725 after , If it's simple REPARTITION hint It can also achieve Rebalace hint The effect of , Because in here hold shuffleOrigin from REPARTITION_BY_NONE Changed to REBALANCE_PARTITIONS_BY_NONE 了 , So it can also be used in OptimizeSkewInRebalancePartitions The rules .
Conclusion
Generally in reparition It can be used anywhere Rebalance To replace , and Rebalance Better file size control , For more information, please check the corresponding spark-jira
边栏推荐
- .Net 应用考虑x64生成
- Principle and general steps of SQL injection
- 安信证券网上开户安全吗 开户收费吗
- Cut! 39 year old Ali P9, saved 150million
- Li Kou today's question -1200 Minimum absolute difference
- Use and principle of thread pool
- 容器环境minor gc异常频繁分析
- FIREBIRD使用经验总结
- Opencv learning -- geometric transformation of image processing
- 跳跃表实例
猜你喜欢
Understand Alibaba cloud's secret weapon "dragon architecture" in the article "science popularization talent"
智慧物流园区供应链管理系统解决方案:数智化供应链赋能物流运输行业供应链新模式
Capvision Rongying's prospectus in Hong Kong was "invalid": it was strictly questioned by the CSRC and required supplementary disclosure
Why do you say that the maximum single table of MySQL database is 20million? Based on what?
[North Asia data recovery] a database data recovery case where the disk on which the database is located is unrecognized due to the RAID disk failure of HP DL380 server
Position encoding practice in transformer
Transformer中position encoding实践
如何实现一个延时队列 ?
DIY a low-cost multi-functional dot matrix clock!
L1-072 scratch lottery
随机推荐
Spark 中的 Rebalance 操作以及与Repartition操作的区别
S2b2b solution for lighting industry: efficiently enable the industrial supply chain and improve the economic benefits of enterprises
Position encoding practice in transformer
智慧物流園區供應鏈管理系統解决方案:數智化供應鏈賦能物流運輸行業供應鏈新模式
DIY a low-cost multi-functional dot matrix clock!
51 single chip microcomputer temperature alarm based on WiFi control
世界环境日 | 周大福用心服务推动减碳环保
力扣今日题-1200. 最小绝对差
Market trend report, technical innovation and market forecast of tetrabromophthalate (pht4 diol) in China
实战:fabric 用户证书吊销操作流程
Configuration instance of Oracle listener server and client
程序员怎么才能提高代码编写速度?
Software Engineer vs Hardware Engineer
基于wifi控制的51单片机温度报警器
Research Report on market supply and demand and strategy of China's well completion equipment industry
Research Report on market supply and demand and strategy of China's plastics and polymer industry
The winning rate against people is 84%, and deepmind AI has reached the level of human experts in army chess for the first time
World Environment Day | Chow Tai Fook serves wholeheartedly to promote carbon reduction and environmental protection
System. Currenttimemillis() and system Nanotime (), which is faster? Don't use it wrong!
Array filter fliter in JS