当前位置:网站首页>spark源码(五)DAGScheduler TaskScheduler如何配合提交任务,application、job、stage、taskset、task对应关系是什么?
spark源码(五)DAGScheduler TaskScheduler如何配合提交任务,application、job、stage、taskset、task对应关系是什么?
2022-07-01 13:00:00 【Interest1_wyt】
上一篇文章我们已经把executor的启动创建介绍完了,这里我们接着介绍,在executor启动后driver如何切分RDD以及最后提交task任务给executor。
为了更好的探查application和job的关系,这里在我们一直使用demo上增加了一行执行“first()行为算子”的代码,现在整体的demo代码如下:
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
val rdd: RDD[String] = sc.makeRDD(List(
"spark hello", "hive", "hadoop hbase", "spark hadoop", "hbase"
))
// 扁平化操作,拆分出数据
val value: RDD[String] = rdd.flatMap(_.split(" "))
// 挑选第一个元素打印(action算子)
println(value.first())
// map转换为(key,1)
val mapRDD: RDD[(String, Int)] = value.map((_, 1))
// reduceByKey根据key进行聚合
val result: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
//打印最终获取的所有结果(action算子)
result.collect().foreach(println)
}这里对于RDD各个算子的详细功能实现原理以及转换算子与行为算子的区别,不是我们本次阅读源码的目的,所以这里略过RDD的具体处理,我们在大方向上追踪任务的拆解和下发执行。
使用过spark的可以知道,这里flatmap、map、reduceByKey都是转换算子,我们就不深入去看,我们着重看下行为算子的处理逻辑,因为只要执行行为算子时才会触发任务的下发执行。所以我们先断点追踪下first的执行逻辑,如下:

withScope可以简单理解为一个代码块的封装,不涉及数据的具体处理逻辑,所以我们接着到take方法中看一下:

take方法中前面的步骤主要是计算分区和获取的结果数,真实的任务提交正是我们框起来的这一行,runJob有多个重载方法被调用封装参数,我们直接到最底层的地方去看看:

可以看到job被提交后,最终是会交给DAGScheduler进行处理。到这我们也可以确定application与job的关系,因为一个行为算子对应一次job提交,所以一个application有多少个job主要看其调用了多少次行为算子。我们接着看下DAGScheduler的处理逻辑:

在这一步DAGScheduler会调用submitJob方法继续提交job,这里我还多标记了两行,主要是说明DAGScheduler会无限阻塞直接job结果返回。我们接着看submitJob的处理逻辑:
[很多重要的逻辑截图不好表示,所以后面源码尽量是直接插入而不是截图]
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 对比分区数,确保没有在一个不存在的分区上发起任务
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
// 获取当前job的唯一标识(可以知道job唯一标识是从0开始,每多一个job就加1)
val jobId = nextJobId.getAndIncrement()
//如果要处理的分区为空
if (partitions.isEmpty) {
//克隆相关属性
val clonedProperties = Utils.cloneProperties(properties)
if (sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) == null) {
clonedProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, callSite.shortForm)
}
val time = clock.getTimeMillis()
//直接发送job启动和结束的事件
listenerBus.post(
SparkListenerJobStart(jobId, time, Seq.empty, clonedProperties))
listenerBus.post(
SparkListenerJobEnd(jobId, time, JobSucceeded))
// 返回job运行结果封装类,由于没有要处理的分区数,所以其任务数直接为0
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
//判断分区是否真的不为空
assert(partitions.nonEmpty)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
//发送JobSubmitted事件
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
waiter
}可以看到job的唯一标识是从0开始,且每次有新job则加1。另外在待处理分区为空时,spark会按照流程发送job开始和结束的事件通知,但是并不会向TaskScheduler下发任务,其直接返回的结果类也是记录task数为0。
下面我们还是来着重看下分区不为空的处理逻辑,其会通过eventProcessLoop向DAGScheduler自己提交一个JobSubmitted事件,这个事件接收处理逻辑如下:

我们到具体的handleJobSubmitted方法中看一下:
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
//重点一:依据当前RDD创建ResultStage
var finalStage: ResultStage = null
try {
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
(_: Int, value: Int) => value + 1)
logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " +
s"but only ${e.maxConcurrentTasks} are available. " +
s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times")
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
barrierJobIdToNumTasksCheckFailures.remove(jobId)
//封装job对象
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
//在job和resultStage间建立互相引用
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
//通过总线发送job启动事件
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
//重点二:提交stage
submitStage(finalStage)
}上面这块代码比较重要,有两个重点,一个是stage的创建,一个stage的提交,下面我们分别来看下:
重点一:依据当前RDD创建ResultStage
按理说这块代码很简单,不应该是重点才是。其实这块是理解DAGScheduler切割job成一个个stage算法的核心。首先我们看下ResultStage的类型信息,可以知道其是Stage的实现类之一:

Stage一共有两个实现类,一个是ResultStage,一个是ShuffleMapStage,而且从Stage的构建参数我们可以知道,stage是有血缘关系的,其会记录自己依赖的父stage。为了便于理解后续的源码,我们先吧stage切割的逻辑先介绍下。
stage切割算法:以行为算子提交的RDD作为最后一个RDD,并以该RDD创建ResultStage,随后通过RDD的血缘关系往前查找其父RDD,如果其父RDD是窄依赖,则将其划入当前stage,如果父RDD是宽依赖,则将宽依赖的那个RDD作为新Stage的划分线,而宽依赖的那个RDD也是新stage里的第一个RDD,然后再通过新stage的RDD向前查找,如果父RDD是窄依赖,则将其划入新stage,如果是宽依赖,则重复前面创建stage的流程。这里有一点需要注意的是,以行为算子提交的RDD为基础创建的stage是ResultStage,其会计算出最终的结果,而因为宽依赖创建的stage是ShuffleMapStage,其会有一个shuffle操作的过程。
这里我们也可以看出job和stage的关系,即一个job至少有一个stage,具体stage的数据要看整个job过程中有多少次宽依赖出现。
下面我们在简单看下createResultStage方法:
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
//通过rdd获取父stage
val parents = getOrCreateParentStages(rdd, jobId)
//获取stage唯一标识,其也是从0开始,每次增1
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
//记录stage、job相关信息
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}这块代码很简单,唯一有点复杂的是通过rdd获取其父stage,这块不是我们此次阅读的目的,所以就不深入介绍了,我们继续下一个重点。
重点二:提交stage
有了前面stage划分的理论介绍,这块的阅读就不会那么困惑了,我们还是先看下源码:
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(s"submitStage($stage (name=${stage.name};" +
s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//重点一:获取stage依赖的父stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//重点二:提交stage任务
submitMissingTasks(stage, jobId.get)
} else {
//如果stage存在父stage,则重复当前过程,知道没有父stage再提交任务
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}这块的方法逻辑大体是:查看stage是否存在父stage,有父stage则重复当前过程,直到没有父stage再提交任务。虽然逻辑不复杂,但是包含的内容很丰富,首先就是重点一getMissingParentStages方法包含了stage的划分算法,其次是重点二submitMissingTasks涉及了stage到task任务的转换与提交,下面我们分别来看下:
重点一:获取stage依赖的父stage
这块主要是通过RDD的血缘和宽窄依赖确定stage的范围以及是否需要开启新stage,源码如下:
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
//将当前rdd加入等待访问的列表,便于后面正式开始往前查找
waitingForVisit += stage.rdd
//请注意,这块是一个内部方法,其会在下面的while循环中被往复调用
def visit(rdd: RDD[_]): Unit = {
//避免rdd被重读访问(重复访问很有可能出现死循环)
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
// 通过rdd血缘,查找其父依赖,如果是宽依赖则创建新stage,反之只用将父依赖加入待访问列表即可
for (dep <- rdd.dependencies) {
dep match {
//如果是宽依赖,则以该宽依赖的rdd创建ShuffleMapStage
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
//如果是窄依赖,则将该依赖添加到待访问列表
case narrowDep: NarrowDependency[_] =>
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
}
while (waitingForVisit.nonEmpty) {
//访问未被处理过的rdd,并将其从待访问列表移除,这块要了解remove的用法,其是返回要移除的元素
visit(waitingForVisit.remove(0))
}
missing.toList
}这块的逻辑跟我们前面介绍的stage划分算法大体相同,简言之就是通过rdd血缘,查找其父依赖,如果是宽依赖则创建新stage,反之只用将父依赖加入待访问列表即可。
重点二:提交stage任务
这块的内容比较多,对于stage还有参数属性等的一些处理没有细讲,我们只标注了和我们阅读源码目的相关的点,下面还是先看下源码:
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
stage match {
case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
case _ =>
}
// 重点一:计算将要执行的计算所依赖的分区索引集合
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
var taskBinaryBytes: Array[Byte] = null
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
logWarning(s"Broadcasting large task binary with size " +
s"${Utils.bytesToString(taskBinaryBytes.length)}")
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
return
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
//重点二:根据stage类型和要计算的分区数创建task集合
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
//重点三:向TaskScheduler提交任务集合
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
}可以看到这块代码虽然多,但是我们只需要关注三个点就行,一个是分区数的计算,一个是taskset集合的生成,还有一个是task任务的提交,下面我们分别看下:
重点一、二 计算分区数并根据分区数创建task集合
分区数的计算主要还是从RDD获取,task集合的创建也没什么好介绍的,其是根据stage的不同创建了一系列处理逻辑相同的task,只是其处理的分区不同。
至此我们也可以获得stage和taskset的关系,一个stage对应一个taskset,一个taskset则包含有多个task,task的元素数量分stage中要处理的分区数相关。
重点三:向TaskScheduler提交任务集合
这里taskScheduler是一个抽象接口,其只有一个TaskSchedulerImpl实现类,我们看下其提交逻辑:
override def submitTasks(taskSet: TaskSet): Unit = {
//获取task集合信息
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//创建任务集管理器
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//获取taskset对应的stage信息
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
//将任务管理器和任务相关关联起来
stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run(): Unit = {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//重点:调用SchedulerBackend的reviveOffers方法进一步处理提交的taskset集合
backend.reviveOffers()
}接着看下reviverOffers方法,这个方法有两个实现类,一个在LocalSchedulerBackend中,一个在CoarseGrainedSchedulerBackend中,我们的StandaloneSchedulerBackend实现类正好是继承了后者,所以我们去CoarseGrainedSchedulerBackend中看下其具体实现:

可以看到其会发送一个ReviveOffers事件出去,该事件的的接收者只有一个就是该类本身,我们再接着其接收事件往下看:

接着往里看:
private def makeOffers(): Unit = {
val taskDescs = withLock {
// 重点一:挑选符合条件的executor
val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort),
executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer)
})
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (taskDescs.nonEmpty) {
//重点二:发送任务
launchTasks(taskDescs)
}
}这里有两个重点,第一个重点其实很简单,之所以标记出来,是为了和我们前面一篇源码文章中的遗留问题相呼应(spark源码(三)spark 如何进行driver、executor任务的调度,以及executor向driver的注册)。这里就不细讲那个问题跟此处源码的关联了,感兴趣的可以往前看下,在前面的文章有详细介绍。
重点二:发送任务
还是先看下源码:
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
//遍历task任务
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
//验证任务序列化传输的内容是不是超过指定大小
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
//获取executor信息
val executorData = executorDataMap(task.executorId)
//executor为task分配资源,该资源在task执行结束后释放
executorData.freeCores -= scheduler.CPUS_PER_TASK
task.resources.foreach { case (rName, rInfo) =>
assert(executorData.resourcesInfo.contains(rName))
executorData.resourcesInfo(rName).acquire(rInfo.addresses)
}
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
//向executor发送LaunchTask任务
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}可以看到,其会先判断任务的rpc传输大小是否超出限定值,如果超出,则抛异常,如果在正常范围内,则向executor发送task任务,注意这里task是一条条发,而不是整个taskset一起发。
至此我们的源码阅读目的已经达成,这块我们其实有两个行为算子,一个是first一个collect,这里只介绍了first的任务下发过程,collect的并没有介绍,主要是二者下发完全一样,都是通过runJob方法提交job,这里之所以加一个first方法,主要是为了探究application和job的关系,如下监控页面所示,我们有两个行为算子,自然而然有两个job。

总结:
1、application、job、stage、taskset、task关系
application和job对应关系:一个application可能有多个job提交,因为每个行为算子都调用runjob方法,即都对应一次job任务提交
job和stage对应关系:一个job可能有多个stage,划分依据主要是RDD是不是出现了shuffle,即每个宽依赖都会导致新stage的生成。
stage和taskset关系:一个stage对应一个taskset,且为stage中每个分区创建一个task任务。
taskset和task关系:taskset包含一组task,每个task执行逻辑相同,主要是面向stage不同的分区。
2、job、stage唯一标识从0开始统计,每来一个新的则加1。
3、stage切割算法:以行为算子提交的RDD作为最后一个RDD,并以该RDD创建ResultStage,随后通过RDD的血缘关系往前查找其父RDD,如果其父RDD是窄依赖,则将其划入当前stage,如果父RDD是宽依赖,则将宽依赖的那个RDD作为新Stage的划分线,而宽依赖的那个RDD也是新stage里的第一个RDD,然后再通过新stage的RDD向前查找,如果父RDD是窄依赖,则将其划入新stage,如果是宽依赖,则重复前面创建stage的流程。这里有一点需要注意的是,以行为算子提交的RDD为基础创建的stage是ResultStage,其会计算出最终的结果,而因为宽依赖创建的stage是ShuffleMapStage,其会有一个shuffle操作的过程。
4、stage有两个实现类ResultStage和ShuffleMapStage,task对应也有两种任务集合,分别是ResultTask和ShuffleMapTask。
边栏推荐
- Development trend and market demand analysis report of China's high purity copper industry Ⓕ 2022 ~ 2028
- Operator-1 first acquaintance with operator
- 内容审计技术
- ZABBIX 6.0 source code installation and ha configuration
- Localtime can't re-enter. It's a pit
- The popular major I chose became "Tiankeng" four years later
- Global and Chinese silicone defoamer production and marketing demand and investment forecast analysis report Ⓨ 2022 ~ 2027
- Nexus builds NPM dependent private database
- Content Audit Technology
- leetcode 322. Coin change (medium)
猜你喜欢

Function test process in software testing

Detailed explanation of OSPF LSA of routing Foundation

Terminal identification technology and management technology

北斗通信模块 北斗gps模块 北斗通信终端DTU

启动solr报错The stack size specified is too small,Specify at least 328k

我选的热门专业,四年后成了“天坑”

ZABBIX 6.0 source code installation and ha configuration
![[Niu Ke's questions -sql big factory interview real questions] no2 User growth scenario (a certain degree of information flow)](/img/a0/e9e7506c9c34986dc73562539c8410.png)
[Niu Ke's questions -sql big factory interview real questions] no2 User growth scenario (a certain degree of information flow)

Svg diamond style code

Jenkins+webhooks-多分支参数化构建-
随机推荐
Report on the 14th five year plan and future development trend of China's integrated circuit packaging industry Ⓓ 2022 ~ 2028
Project deployment is not difficult at all!
Operator-1 first acquaintance with operator
oracle cdc 数据传输时,clob类型字段,在update时值会丢失,update前有值,但
Redis exploration: cache breakdown, cache avalanche, cache penetration
How to count the status of network sockets?
Simple two ball loading
Has anyone ever encountered this situation? When Oracle logminer is synchronized, the value of CLOB field is lost
Detailed explanation of OSPF LSA of routing Foundation
逆向调试入门-PE结构-输入表输出表05/07
Report on the current situation and development trend of bidirectional polypropylene composite film industry in the world and China Ⓟ 2022 ~ 2028
VM virtual machine configuration dynamic IP and static IP access
leetcode 322. Coin Change 零钱兑换(中等)
科学创业三问:关于时机、痛点与重要决策
ZABBIX 6.0 source code installation and ha configuration
SSO and JWT good article sorting
Flutter SQLite使用
Cs5268 advantages replace ag9321mcq typec multi in one docking station scheme
Run PowerShell script prompt "because running script is prohibited on this system" solution
How to play with the reading and writing operations of blocking sockets?