当前位置:网站首页>Spark 3.0 Adaptive Execution 代码实现及数据倾斜优化
Spark 3.0 Adaptive Execution 代码实现及数据倾斜优化
2022-07-27 14:23:00 【wankunde】
启用Spark AE
Adaptive Execution 模式是在使用Spark物理执行计划注入生成的。在QueryExecution类中有 preparations 一组优化器来对物理执行计划进行优化, InsertAdaptiveSparkPlan 就是第一个优化器。
InsertAdaptiveSparkPlan 使用 PlanAdaptiveSubqueries Rule对部分SubQuery处理后,将当前 Plan 包装成 AdaptiveSparkPlanExec 。
当执行 AdaptiveSparkPlanExec 的 collect() 或 take() 方法时,全部会先执行 getFinalPhysicalPlan() 方法生成新的SparkPlan,再执行对应的SparkPlan对应的方法。
// QueryExecution类
lazy val executedPlan: SparkPlan = {
executePhase(QueryPlanningTracker.PLANNING) {
QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
}
}
protected def preparations: Seq[Rule[SparkPlan]] = {
QueryExecution.preparations(sparkSession,
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
}
private[execution] def preparations(
sparkSession: SparkSession,
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
// `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
// as the original plan is hidden behind `AdaptiveSparkPlanExec`.
adaptiveExecutionRule.toSeq ++
Seq(
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
EnsureRequirements(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
ReuseSubquery(sparkSession.sessionState.conf)
)
}
// InsertAdaptiveSparkPlan
override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)
private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
// ...some checking
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
// Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
// Fall back to non-AQE mode if AQE is not supported in any of the sub-queries.
val subqueryMap = buildSubqueryMap(plan)
val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
val preprocessingRules = Seq(
planSubqueriesRule)
// Run pre-processing rules.
val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
logDebug(s"Adaptive execution enabled for plan: $plan")
AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
} catch {
case SubqueryAdaptiveNotSupportedException(subquery) =>
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for sub-query: $subquery.")
plan
}
} else {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
s"but is not supported for query: $plan.")
plan
}
case _ => plan
}
AE对Stage 分阶段提交执行和优化过程
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
// 第一次调用 getFinalPhysicalPlan方法时为false,等待该方法执行完毕,全部Stage不会再改变,直接返回最终plan
if (isFinalPlan) return currentPhysicalPlan
// In case of this adaptive plan being executed out of `withActive` scoped functions, e.g.,
// `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
// created in the middle of the execution.
context.session.withActive {
val executionId = getExecutionId
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
val errors = new mutable.ArrayBuffer[Throwable]()
var stagesToReplace = Seq.empty[QueryStageExec]
while (!result.allChildStagesMaterialized) {
currentPhysicalPlan = result.newPlan
// 接下来有哪些Stage要执行,参考 createQueryStages(plan: SparkPlan) 方法
if (result.newStages.nonEmpty) {
stagesToReplace = result.newStages ++ stagesToReplace
// onUpdatePlan 通过listener更新UI
executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))
// Start materialization of all new stages and fail fast if any stages failed eagerly
result.newStages.foreach {
stage =>
try {
// materialize() 方法对Stage的作为一个单独的Job提交执行,并返回 SimpleFutureAction 来接收执行结果
// QueryStageExec: materialize() -> doMaterialize() ->
// ShuffleExchangeExec: -> mapOutputStatisticsFuture -> ShuffleExchangeExec
// SparkContext: -> submitMapStage(shuffleDependency)
stage.materialize().onComplete {
res =>
if (res.isSuccess) {
events.offer(StageSuccess(stage, res.get))
} else {
events.offer(StageFailure(stage, res.failed.get))
}
}(AdaptiveSparkPlanExec.executionContext)
} catch {
case e: Throwable =>
cleanUpAndThrowException(Seq(e), Some(stage.id))
}
}
}
// Wait on the next completed stage, which indicates new stats are available and probably
// new stages can be created. There might be other stages that finish at around the same
// time, so we process those stages too in order to reduce re-planning.
// 等待,直到有Stage执行完毕
val nextMsg = events.take()
val rem = new util.ArrayList[StageMaterializationEvent]()
events.drainTo(rem)
(Seq(nextMsg) ++ rem.asScala).foreach {
case StageSuccess(stage, res) =>
stage.resultOption = Some(res)
case StageFailure(stage, ex) =>
errors.append(ex)
}
// In case of errors, we cancel all running stages and throw exception.
if (errors.nonEmpty) {
cleanUpAndThrowException(errors, None)
}
// Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
// than that of the current plan; otherwise keep the current physical plan together with
// the current logical plan since the physical plan's logical links point to the logical
// plan it has originated from.
// Meanwhile, we keep a list of the query stages that have been created since last plan
// update, which stands for the "semantic gap" between the current logical and physical
// plans. And each time before re-planning, we replace the corresponding nodes in the
// current logical plan with logical query stages to make it semantically in sync with
// the current physical plan. Once a new plan is adopted and both logical and physical
// plans are updated, we can clear the query stage list because at this point the two plans
// are semantically and physically in sync again.
// 对前面的Stage替换为 LogicalQueryStage 节点
val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
// 再次调用optimizer 和planner 进行优化
val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
if (newCost < origCost ||
(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
stagesToReplace = Seq.empty[QueryStageExec]
}
// Now that some stages have finished, we can try creating new stages.
// 进入下一轮循环,如果存在Stage执行完毕, 对应的resultOption 会有值,对应的allChildStagesMaterialized 属性 = true
result = createQueryStages(currentPhysicalPlan)
}
// Run the final plan when there's no more unfinished stages.
// 所有前置stage全部执行完毕,根据stats信息优化物理执行计划,确定最终的 physical plan
currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
isFinalPlan = true
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
currentPhysicalPlan
}
}
// SparkContext
/** * Submit a map stage for execution. This is currently an internal API only, but might be * promoted to DeveloperApi in the future. */
private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
: SimpleFutureAction[MapOutputStatistics] = {
assertNotStopped()
val callSite = getCallSite()
var result: MapOutputStatistics = null
val waiter = dagScheduler.submitMapStage(
dependency,
(r: MapOutputStatistics) => {
result = r },
callSite,
localProperties.get)
new SimpleFutureAction[MapOutputStatistics](waiter, result)
}
// DAGScheduler
def submitMapStage[K, V, C](
dependency: ShuffleDependency[K, V, C],
callback: MapOutputStatistics => Unit,
callSite: CallSite,
properties: Properties): JobWaiter[MapOutputStatistics] = {
val rdd = dependency.rdd
val jobId = nextJobId.getAndIncrement()
if (rdd.partitions.length == 0) {
throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
}
// We create a JobWaiter with only one "task", which will be marked as complete when the whole
// map stage has completed, and will be passed the MapOutputStatistics for that stage.
// This makes it easier to avoid race conditions between the user code and the map output
// tracker that might result if we told the user the stage had finished, but then they queries
// the map output tracker and some node failures had caused the output statistics to be lost.
val waiter = new JobWaiter[MapOutputStatistics](
this, jobId, 1,
(_: Int, r: MapOutputStatistics) => callback(r))
eventProcessLoop.post(MapStageSubmitted(
jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
waiter
}
现阶段AdaptiveSparkPlanExec 中对物理执行的优化器列表:
// AdaptiveSparkPlanExec
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReuseAdaptiveSubquery(conf, context.subqueryCache),
CoalesceShufflePartitions(context.session),
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
// added by `CoalesceShufflePartitions`. So they must be executed after it.
OptimizeSkewedJoin(conf),
OptimizeLocalShuffleReader(conf),
ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules),
CollapseCodegenStages(conf)
)
OptimizeSkewedJoin 优化原理
AE模式下,每个Stage执行之前,前置依赖Stage已经全部执行完毕,那么就可以获取到每个Stage的stats信息。
当发现shuffle partition的输出超过partition size的中位数的5倍,且partition的输出大于 256M 会被判断产生数据倾斜, 将partition 数据按照targetSize进行切分为N份。
targetSize = max(64M, 非数据倾斜partition的平均大小)
优化前 shuffle
优化后 shuffle
边栏推荐
- HJ8 合并表记录
- The design method of integral operation circuit is introduced in detail
- 初探JuiceFS
- What is the breakthrough point of digital transformation in the electronic manufacturing industry? Lean manufacturing is the key
- DevEco Studio2.1运行项目报错
- Kubernetes CNI classification / operation mechanism
- EMC design scheme of CAN bus
- After configuring corswebfilter in grain mall, an error is reported: resource sharing error:multiplealloworiginvalues
- Unity performance optimization ----- occlusion culling of rendering optimization (GPU)
- EMC design scheme of USB2.0 Interface
猜你喜欢

EMC design scheme of CAN bus

Design scheme of digital oscilloscope based on stm32

Huayun data creates a perfect information technology and innovation talent training system to help the high-quality development of information technology and innovation industry

华云数据打造完善的信创人才培养体系 助力信创产业高质量发展

Spark Bucket Table Join

学习Parquet文件格式

Basic usage of kotlin

CAN总线的EMC设计方案

Selenium 报错:session not created: This version of ChromeDriver only supports Chrome version 81

Spark Filter算子在Parquet文件上的下推
随机推荐
Lua study notes
反射
Network equipment hard core technology insider router 20 dpdk (V)
Selenium reports an error: session not created: this version of chromedriver only supports chrome version 81
CAN总线的EMC设计方案
Pytorch replaces some components in numpy. / / please indicate the source of the reprint
华云数据打造完善的信创人才培养体系 助力信创产业高质量发展
Leetcode 244 week competition - post competition supplementary question solution [broccoli players]
DIY ultra detailed tutorial on making oscilloscope: (1) I'm not trying to make an oscilloscope
EMC design scheme of RS485 interface
谷粒商城配置CorsWebFilter后,报错:Resource sharing error:MultipleAllowOriginValues
Leetcode interview question 17.21. water volume double pointer of histogram, monotonic stack /hard
IJCAI 2022 outstanding papers were published, and 298 Chinese mainland authors won the first place in two items
Network equipment hard core technology insider router Chapter 10 Cisco asr9900 disassembly (III)
Unity3d learning note 10 - texture array
Data warehouse project is never a technical project
Leetcode 244周赛-赛后补题题解【西兰花选手】
Leetcode 90. subset II backtracking /medium
《剑指Offer》 链表反转
Usage of countdownlatch in multithreaded environment