当前位置:网站首页>Spark 中的 Rebalance 操作以及与Repartition操作的区别
Spark 中的 Rebalance 操作以及与Repartition操作的区别
2022-07-04 15:06:00 【鸿乃江边鸟】
背景
本文基本spark 3.2.1
在Partitioning Hints Types中有提到Rebalance操作以及Repartition操作,而且他们都可以做数据的重分区,他们之间有什么区别呢?
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
分析
- Rebalance
参考对应的SPARK-35725,其目的是为了在AQE阶段,根据spark.sql.adaptive.advisoryPartitionSizeInBytes
进行分区的重新分区,防止数据倾斜。再加上SPARK-35786,就可以根据hint进行重分区。
具体看看怎么实现的,OptimizeSkewInRebalancePartitions代码如下:
override val supportedShuffleOrigins: Seq[ShuffleOrigin] =
Seq(REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL)
...
override def apply(plan: SparkPlan): SparkPlan = {
if (!conf.getConf(SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED)) {
return plan
}
plan match {
case stage: ShuffleQueryStageExec if isSupported(stage.shuffle) =>
tryOptimizeSkewedPartitions(stage)
case _ => plan
}
}
只有开启了 spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled
了的情况下,才可以进行分区的expand,而且还得shuffle的来源还得是REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL的情况下才能适用该规则.tryOptimizeSkewedPartitions
的具体实现可以看代码,该代码的注释很清楚:
* We use ADVISORY_PARTITION_SIZE_IN_BYTES size to decide if a partition should be optimized.
* Let's say we have 3 maps with 3 shuffle partitions, and assuming r1 has data skew issue.
* the map side looks like:
* m0:[b0, b1, b2], m1:[b0, b1, b2], m2:[b0, b1, b2]
* and the reduce side looks like:
* (without this rule) r1[m0-b1, m1-b1, m2-b1]
* / \
* r0:[m0-b0, m1-b0, m2-b0], r1-0:[m0-b1], r1-1:[m1-b1], r1-2:[m2-b1], r2[m0-b2, m1-b2, m2-b2]
*
* Note that, this rule is only applied with the SparkPlan whose top-level node is
* ShuffleQueryStageExec.
我们分析一下REBALANCE_PARTITIONS_BY_NONE, REBALANCE_PARTITIONS_BY_COL来源:
这是在ResolveHints规则中进行转换的:
private def createRebalance(hint: UnresolvedHint): LogicalPlan = {
hint.parameters match {
case partitionExprs @ Seq(_*) =>
val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute])
if (invalidParams.nonEmpty) {
val hintName = hint.name.toUpperCase(Locale.ROOT)
throw QueryCompilationErrors.invalidHintParameterError(hintName, invalidParams)
}
RebalancePartitions(partitionExprs.map(_.asInstanceOf[Expression]), hint.child)
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
_.containsPattern(UNRESOLVED_HINT), ruleId) {
case hint @ UnresolvedHint(hintName, _, _) => hintName.toUpperCase(Locale.ROOT) match {
case "REPARTITION" =>
createRepartition(shuffle = true, hint)
case "COALESCE" =>
createRepartition(shuffle = false, hint)
case "REPARTITION_BY_RANGE" =>
createRepartitionByRange(hint)
case "REBALANCE" if conf.adaptiveExecutionEnabled =>
createRebalance(hint)
case _ => hint
}
}
可见只有在AQE开启的情况下 该Rebalance的hint才生效,生成对应的RebalancePartitions逻辑计划,而该逻辑计划会在BasicOperators规则中,转换成ShuffleEchangeExec物理计划:
case r: logical.RebalancePartitions =>
val shuffleOrigin = if (r.partitionExpressions.isEmpty) {
REBALANCE_PARTITIONS_BY_NONE
} else {
REBALANCE_PARTITIONS_BY_COL
}
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
因为只有shuffle操作的时候,AQE阶段才会应用到OptimizeSkewInRebalancePartitions
规则,这样才会在shuffle read阶段根据shuffle write阶段的数据进行优化。
注意:
其中 OptimizeShuffleWithLocalRead 不适用 shuffleOrigin为REBALANCE_PARTITIONS_BY_COL的,要不然在动态分区存在小文件的问题,具体见该处讨论
- Repartition
相对于Rebalance,该hint只是根据指定的固定的分区数据或者列进行分区,这个时候每个分区的大小并不能控制,只能说是平均分配或者说是按照列进行hash分区(这种情况存在文件大小不一的情况)
具体的分析,可以参考Rebalance的分析。
注意一点的是在SPARK-35650之后,Repartition操作也是在AQE阶段进行优化,而在SPARK-35725 之后,如果是单纯的REPARTITION hint 也是可以达到Rebalace hint的效果,因为在此处把shuffleOrigin从REPARTITION_BY_NONE改成了REBALANCE_PARTITIONS_BY_NONE了,所以也能使用于OptimizeSkewInRebalancePartitions规则。
结论
一般在reparition用到的地方都可以Rebalance来替换,而且Rebalance有更好的文件大小的控制能力,更多的信息可以查看对应的 spark-jira
边栏推荐
- How can programmers improve the speed of code writing?
- MFC implementation of ACM basic questions encoded by the number of characters
- ONgDB图数据库与Spark的集成
- Yanwen logistics plans to be listed on Shenzhen Stock Exchange: it is mainly engaged in international express business, and its gross profit margin is far lower than the industry level
- Practice: fabric user certificate revocation operation process
- Embedded software architecture design - function call
- Inside and outside: flow chart drawing elementary: six common mistakes
- 时钟轮在 RPC 中的应用
- C # realizes FFT forward and inverse transformation and frequency domain filtering
- Array filter fliter in JS
猜你喜欢
Visual Studio 2019 (LocalDB)MSSQLLocalDB SQL Server 2014 数据库版本为852无法打开,此服务器支持782
函數式接口,方法引用,Lambda實現的List集合排序小工具
"Cannot initialize Photoshop because the temporary storage disk is full" graphic solution
Interface fonctionnelle, référence de méthode, Widget de tri de liste implémenté par lambda
MFC implementation of ACM basic questions encoded by the number of characters
时钟轮在 RPC 中的应用
Software Engineer vs Hardware Engineer
电子元器件B2B商城系统开发:赋能企业构建进销存标准化流程实例
Object. Usage of keys()
Anta is actually a technology company? These operations fool netizens
随机推荐
"Cannot initialize Photoshop because the temporary storage disk is full" graphic solution
Implement graph data construction task based on check point
CMPSC311 Linear Device
祝贺Artefact首席数据科学家张鹏飞先生荣获 Campaign Asia Tech MVP 2022
Market trend report, technical innovation and market forecast of China's hair repair therapeutic apparatus
The content of the source code crawled by the crawler is inconsistent with that in the developer mode
How to decrypt worksheet protection password in Excel file
How to implicitly pass values when transferring forms
新的职业已经出现,怎么能够停滞不前 ,人社部公布建筑新职业
Go language loop statement (under Lesson 10)
Li Kou today's question -1200 Minimum absolute difference
China tall oil fatty acid market trend report, technical dynamic innovation and market forecast
Research Report of exoskeleton robot industry - market status analysis and development prospect prediction
如何为ONgDB核心项目源码做贡献
Research Report on market supply and demand and strategy of China's well completion equipment industry
科普达人丨一文看懂阿里云的秘密武器“神龙架构”
Practice: fabric user certificate revocation operation process
Four point probe Industry Research Report - market status analysis and development prospect prediction
Understand Alibaba cloud's secret weapon "dragon architecture" in the article "science popularization talent"
线性时间排序