当前位置:网站首页>Spark 3.0 Adaptive Execution 代码实现及数据倾斜优化
Spark 3.0 Adaptive Execution 代码实现及数据倾斜优化
2022-07-27 14:23:00 【wankunde】
启用Spark AE
Adaptive Execution 模式是在使用Spark物理执行计划注入生成的。在QueryExecution类中有 preparations 一组优化器来对物理执行计划进行优化, InsertAdaptiveSparkPlan 就是第一个优化器。
InsertAdaptiveSparkPlan 使用 PlanAdaptiveSubqueries Rule对部分SubQuery处理后,将当前 Plan 包装成 AdaptiveSparkPlanExec 。
当执行 AdaptiveSparkPlanExec 的 collect() 或 take() 方法时,全部会先执行 getFinalPhysicalPlan() 方法生成新的SparkPlan,再执行对应的SparkPlan对应的方法。
// QueryExecution类
lazy val executedPlan: SparkPlan = {
executePhase(QueryPlanningTracker.PLANNING) {
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
}
protected def preparations: Seq[Rule[SparkPlan]] = {
QueryExecution.preparations(sparkSession,
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
}
private[execution] def preparations(
sparkSession: SparkSession,
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
adaptiveExecutionRule.toSeq ++
Seq(
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf)
)
}
// InsertAdaptiveSparkPlan
override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)
private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
// ...some checking
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
// Fall back to non-AQE mode if AQE is not supported in any of the sub-queries.
val subqueryMap = buildSubqueryMap(plan)
val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
val preprocessingRules = Seq(
planSubqueriesRule)
// Run pre-processing rules.
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
logDebug(s"Adaptive execution enabled for plan: $plan")
AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
} catch {
case SubqueryAdaptiveNotSupportedException(subquery) =>
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for sub-query: $subquery.")
plan
}
} else {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for query: $plan.")
plan
}
case _ => plan
}
AE对Stage 分阶段提交执行和优化过程
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
// 第一次调用 getFinalPhysicalPlan方法时为false,等待该方法执行完毕,全部Stage不会再改变,直接返回最终plan
if (isFinalPlan) return currentPhysicalPlan
// In case of this adaptive plan being executed out of `withActive` scoped functions, e.g.,
// `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
// created in the middle of the execution.
context.session.withActive {
val executionId = getExecutionId
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
val errors = new mutable.ArrayBuffer[Throwable]()
var stagesToReplace = Seq.empty[QueryStageExec]
while (!result.allChildStagesMaterialized) {
currentPhysicalPlan = result.newPlan
// 接下来有哪些Stage要执行,参考 createQueryStages(plan: SparkPlan) 方法
if (result.newStages.nonEmpty) {
stagesToReplace = result.newStages ++ stagesToReplace
// onUpdatePlan 通过listener更新UI
executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))
// Start materialization of all new stages and fail fast if any stages failed eagerly
result.newStages.foreach {
stage =>
try {
// materialize() 方法对Stage的作为一个单独的Job提交执行,并返回 SimpleFutureAction 来接收执行结果
// QueryStageExec: materialize() -> doMaterialize() ->
// ShuffleExchangeExec: -> mapOutputStatisticsFuture -> ShuffleExchangeExec
// SparkContext: -> submitMapStage(shuffleDependency)
stage.materialize().onComplete {
res =>
if (res.isSuccess) {
events.offer(StageSuccess(stage, res.get))
} else {
events.offer(StageFailure(stage, res.failed.get))
}
}(AdaptiveSparkPlanExec.executionContext)
} catch {
case e: Throwable =>
cleanUpAndThrowException(Seq(e), Some(stage.id))
}
}
}
// Wait on the next completed stage, which indicates new stats are available and probably
// new stages can be created. There might be other stages that finish at around the same
// time, so we process those stages too in order to reduce re-planning.
// 等待,直到有Stage执行完毕
val nextMsg = events.take()
val rem = new util.ArrayList[StageMaterializationEvent]()
events.drainTo(rem)
(Seq(nextMsg) ++ rem.asScala).foreach {
case StageSuccess(stage, res) =>
stage.resultOption = Some(res)
case StageFailure(stage, ex) =>
errors.append(ex)
}
// In case of errors, we cancel all running stages and throw exception.
if (errors.nonEmpty) {
cleanUpAndThrowException(errors, None)
}
// Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
// than that of the current plan; otherwise keep the current physical plan together with
// the current logical plan since the physical plan's logical links point to the logical
// plan it has originated from.
// Meanwhile, we keep a list of the query stages that have been created since last plan
// update, which stands for the "semantic gap" between the current logical and physical
// plans. And each time before re-planning, we replace the corresponding nodes in the
// current logical plan with logical query stages to make it semantically in sync with
// the current physical plan. Once a new plan is adopted and both logical and physical
// plans are updated, we can clear the query stage list because at this point the two plans
// are semantically and physically in sync again.
// 对前面的Stage替换为 LogicalQueryStage 节点
val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
// 再次调用optimizer 和planner 进行优化
val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
if (newCost < origCost ||
(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
stagesToReplace = Seq.empty[QueryStageExec]
}
// Now that some stages have finished, we can try creating new stages.
// 进入下一轮循环,如果存在Stage执行完毕, 对应的resultOption 会有值,对应的allChildStagesMaterialized 属性 = true
result = createQueryStages(currentPhysicalPlan)
}
// Run the final plan when there's no more unfinished stages.
// 所有前置stage全部执行完毕,根据stats信息优化物理执行计划,确定最终的 physical plan
currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
isFinalPlan = true
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
currentPhysicalPlan
}
}
// SparkContext
/** * Submit a map stage for execution. This is currently an internal API only, but might be * promoted to DeveloperApi in the future. */
private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
: SimpleFutureAction[MapOutputStatistics] = {
assertNotStopped()
val callSite = getCallSite()
var result: MapOutputStatistics = null
val waiter = dagScheduler.submitMapStage(
dependency,
(r: MapOutputStatistics) => {
result = r },
callSite,
localProperties.get)
new SimpleFutureAction[MapOutputStatistics](waiter, result)
}
// DAGScheduler
def submitMapStage[K, V, C](
dependency: ShuffleDependency[K, V, C],
callback: MapOutputStatistics => Unit,
callSite: CallSite,
properties: Properties): JobWaiter[MapOutputStatistics] = {
val rdd = dependency.rdd
val jobId = nextJobId.getAndIncrement()
if (rdd.partitions.length == 0) {
throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
}
// We create a JobWaiter with only one "task", which will be marked as complete when the whole
// map stage has completed, and will be passed the MapOutputStatistics for that stage.
// This makes it easier to avoid race conditions between the user code and the map output
// tracker that might result if we told the user the stage had finished, but then they queries
// the map output tracker and some node failures had caused the output statistics to be lost.
val waiter = new JobWaiter[MapOutputStatistics](
this, jobId, 1,
(_: Int, r: MapOutputStatistics) => callback(r))
eventProcessLoop.post(MapStageSubmitted(
jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
waiter
}
现阶段AdaptiveSparkPlanExec 中对物理执行的优化器列表:
// AdaptiveSparkPlanExec
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
// added by `CoalesceShufflePartitions`. So they must be executed after it.
OptimizeSkewedJoin(conf),
OptimizeLocalShuffleReader(conf),
ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules),
CollapseCodegenStages(conf)
)
OptimizeSkewedJoin 优化原理
AE模式下,每个Stage执行之前,前置依赖Stage已经全部执行完毕,那么就可以获取到每个Stage的stats信息。
当发现shuffle partition的输出超过partition size的中位数的5倍,且partition的输出大于 256M 会被判断产生数据倾斜, 将partition 数据按照targetSize进行切分为N份。
targetSize = max(64M, 非数据倾斜partition的平均大小)
优化前 shuffle
优化后 shuffle
边栏推荐
- 华云数据打造完善的信创人才培养体系 助力信创产业高质量发展
- DIY ultra detailed tutorial on making oscilloscope: (1) I'm not trying to make an oscilloscope
- [0 basic operations research] [super detail] column generation
- Unity performance optimization ----- LOD (level of detail) of rendering optimization (GPU)
- Spark 本地程序启动缓慢问题排查
- USB接口电磁兼容(EMC)解决方案
- Unity性能优化------DrawCall
- What is the breakthrough point of digital transformation in the electronic manufacturing industry? Lean manufacturing is the key
- 西瓜书《机器学习》阅读笔记之第一章绪论
- 修改 Spark 支持远程访问OSS文件
猜你喜欢

Photoelectric isolation circuit design scheme (six photoelectric isolation circuit diagrams based on optocoupler and ad210an)

Do you really understand CMS garbage collector?
Comparison of advantages and disadvantages between instrument amplifier and operational amplifier
USB interface electromagnetic compatibility (EMC) solution

Leetcode-1737-满足三条件之一需改变的最少字符数

Dan bin Investment Summit: on the importance of asset management!

Zhou Hongyi: if the digital security ability is backward, it will also be beaten

Singles cup, web:web check in

TL431-2.5v基准电压芯片几种基本用法

IJCAI 2022 outstanding papers were published, and 298 Chinese mainland authors won the first place in two items
随机推荐
Introduction to STM32 learning can controller
Leetcode 81. search rotation sort array II binary /medium
[0 basic operations research] [super detail] column generation
Unity's simplest object pool implementation
STL value string learning
Unity performance optimization ----- LOD (level of detail) of rendering optimization (GPU)
HJ8 合并表记录
Record record record
Code coverage statistical artifact -jacobo tool practice
How to package AssetBundle
魔塔项目中的问题解决
学习Parquet文件格式
AssetBundle如何打包
Do you really understand CMS garbage collector?
Spark 本地程序启动缓慢问题排查
STM32 can -- can ID filter analysis
Sword finger offer cut rope
Adaptation verification new occupation is coming! Huayun data participated in the preparation of the national vocational skill standard for information system adaptation verifiers
TCC
/dev/loop1占用100%问题