当前位置:网站首页>Rebalance operation in spark and its difference from repartition operation
Rebalance operation in spark and its difference from repartition operation
2022-07-04 17:01:00 【Hongnai riverside bird】
background
This article is basically spark 3.2.1
stay Partitioning Hints Types Mentioned in Rebalance Operation and Repartition operation , And they can do data repartition , What's the difference between them ?
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
analysis
- Rebalance
Refer to the corresponding SPARK-35725, Its purpose is to be in AQE Stage , according tospark.sql.adaptive.advisoryPartitionSizeInBytesRepartition of partitions , Prevent data skew . Plus SPARK-35786, It can be based on hint Re zoning .
Let's see how to achieve it ,OptimizeSkewInRebalancePartitions The code is as follows :
override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
...
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
return plan
}
plan match {
case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
tryOptimizeSkewedPartitions(stage)
case _ => plan
}
}
It's only on spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled In the case of , Can be partitioned expand, And you have to shuffle The source of is REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL This rule can be applied only when .tryOptimizeSkewedPartitions The specific implementation of can be seen in the code , The comments of the code are very clear :
* We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
* Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
* the map side looks like:
* m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
* and the reduce side looks like:
* (without this rule) r1[m0-b1, m1-b1, m2-b1]
* / \
* r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
*
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.
Let's analyze REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL source :
This is ResolveHints In the rule :
private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
hint.parameters match {
case partitionExprs @ Seq(_*) =>
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
if (invalidParams.nonEmpty) {
val hintName = hint.name.toUpperCase(Locale.ROOT)
throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams)
}
RebalancePartitions(partitionExprs.map(_.asInstanceOf[Expression]), hint.child)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_HINT), ruleId) {
case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match {
case "REPARTITION" =>
createRepartition(shuffle = true, hint)
case "COALESCE" =>
createRepartition(shuffle = false, hint)
case "REPARTITION_BY_RANGE" =>
createRepartitionByRange(hint)
case "REBALANCE" if conf.adaptiveExecutionEnabled =>
createRebalance(hint)
case _ => hint
}
}
It can be seen that only in AQE When on The Rebalance Of hint To take effect , Generate corresponding RebalancePartitions Logical plan , And the logical plan will be in BasicOperators In the rules , convert to ShuffleEchangeExec Physical plan :
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else {
REBALANCE_PARTITIONS_BY_COL
}
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
Because only shuffle During operation ,AQE Phase will be applied to OptimizeSkewInRebalancePartitions The rules , Only in this way can we be in shuffle read Stage basis shuffle write Optimize the data of stage .
Be careful :
among OptimizeShuffleWithLocalRead Do not apply shuffleOrigin by REBALANCE_PARTITIONS_BY_COL Of , Otherwise, there is a problem of small files in the dynamic partition , Specific view Discussion here
- Repartition
be relative to Rebalance, The hint Just partition according to the specified fixed partition data or columns , At this time, the size of each partition cannot be controlled , It can only be said that it is distributed evenly or by column hash Partition ( In this case, there are different file sizes )
Specific analysis , You can refer to Rebalance Analysis of .
One thing to note is in SPARK-35650 after ,Repartition The operation is also in AQE Stage to optimize , And in the SPARK-35725 after , If it's simple REPARTITION hint It can also achieve Rebalace hint The effect of , Because in here hold shuffleOrigin from REPARTITION_BY_NONE Changed to REBALANCE_PARTITIONS_BY_NONE 了 , So it can also be used in OptimizeSkewInRebalancePartitions The rules .
Conclusion
Generally in reparition It can be used anywhere Rebalance To replace , and Rebalance Better file size control , For more information, please check the corresponding spark-jira
边栏推荐
- China Indonesia adhesive market trend report, technological innovation and market forecast
- 基于check-point实现图数据构建任务
- 高度剩余法
- Implement graph data construction task based on check point
- Oracle监听器Server端与Client端配置实例
- 世界环境日 | 周大福用心服务推动减碳环保
- 电子元器件B2B商城系统开发:赋能企业构建进销存标准化流程实例
- 实战:fabric 用户证书吊销操作流程
- egg. JS learning notes
- 如何实现一个延时队列 ?
猜你喜欢

基于wifi控制的51单片机温度报警器

The winning rate against people is 84%, and deepmind AI has reached the level of human experts in army chess for the first time

L1-072 scratch lottery

Yanwen logistics plans to be listed on Shenzhen Stock Exchange: it is mainly engaged in international express business, and its gross profit margin is far lower than the industry level

昆明三环闭合工程将经过这些地方,有在你家附近的吗?

Statistical learning: logistic regression and cross entropy loss (pytoch Implementation)

【Go ~ 0到1 】 第六天 文件的读写与创建

The test experience "tortured" by the PMP test is worth your review

Years of training, towards Kata 3.0! Enter the safe container experience out of the box | dragon lizard Technology
Application of clock wheel in RPC
随机推荐
线程池的使用和原理
C# 更加优质的操作MongoDB数据库
Go language loop statement (under Lesson 10)
同构图与异构图CYPHER-TASK设计与TASK锁机制
Sequence diagram data modeling and industrial chain analysis
2022PMP考试基本情况详情了解
Research Report on market supply and demand and strategy of China's four sided flat bag industry
World Environment Day | Chow Tai Fook serves wholeheartedly to promote carbon reduction and environmental protection
Application and Optimization Practice of redis in vivo push platform
Object. Usage of keys()
C# 实现 FFT 正反变换 和 频域滤波
科普达人丨一文看懂阿里云的秘密武器“神龙架构”
APOC自定义函数和过程
Capvision Rongying's prospectus in Hong Kong was "invalid": it was strictly questioned by the CSRC and required supplementary disclosure
Cypher task design and task locking mechanism of isomorphic and heterogeneous graphs
建筑建材行业经销商协同系统解决方案:赋能企业构建核心竞争力
Understand ThreadLocal in one picture
The vscode waveform curve prompts that the header file cannot be found (an error is reported if the header file exists)
Vscode prompt Please install clang or check configuration 'clang executable‘
嵌入式软件架构设计-函数调用