当前位置:网站首页>Spark source code (V) how does dagscheduler taskscheduler cooperate with submitting tasks, and what is the corresponding relationship between application, job, stage, taskset, and task?
Spark source code (V) how does dagscheduler taskscheduler cooperate with submitting tasks, and what is the corresponding relationship between application, job, stage, taskset, and task?
2022-07-01 13:18:00 【Interest1_ wyt】
In the last article we have put executor The introduction of startup creation of , Here we will continue to introduce , stay executor After starting driver How to cut RDD And finally submit task Mission to executor.
For better exploration application and job The relationship between , Here we have been using demo Add a line to execute “first() Behavior operator ” Code for , Now the whole demo The code is as follows :
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
val rdd: RDD[String] = sc.makeRDD(List(
"spark hello", "hive", "hadoop hbase", "spark hadoop", "hbase"
))
// Flattening operation , Split data
val value: RDD[String] = rdd.flatMap(_.split(" "))
// Pick the first element to print (action operator )
println(value.first())
// map Convert to (key,1)
val mapRDD: RDD[(String, Int)] = value.map((_, 1))
// reduceByKey according to key Aggregate
val result: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
// Print all the results finally obtained (action operator )
result.collect().foreach(println)
}Here for RDD The detailed function realization principle of each operator and the difference between transformation operator and behavior operator , It's not our purpose to read the source code this time , So I'll skip RDD Specific treatment of , We track the dismantling and distribution of tasks in the general direction .
Have used spark You can know , here flatmap、map、reduceByKey Are all conversion operators , We won't go deep , Let's focus on the processing logic of behavior operators , Because as long as the behavior operator is executed, the issuance and execution of the task will be triggered . So let's track the breakpoint first first Execution logic , as follows :

withScope It can be simply understood as the encapsulation of a code block , It does not involve the specific processing logic of data , So we went on take Take a look at :

take The previous steps in the method are mainly to calculate the partition and the number of results obtained , The real task submission is the line we framed ,runJob There are multiple overloaded methods called to encapsulate parameters , Let's go directly to the bottom to have a look :

You can see job After being submitted , Finally, it will be handed over to DAGScheduler To deal with . We can also be sure here application And job The relationship between , Because a behavior operator corresponds to once job Submit , So a application How many job It mainly depends on how many times it calls the behavior operator . Let's take a look at DAGScheduler Processing logic :

In this step DAGScheduler Would call submitJob Method to continue submitting job, I also marked two more lines here , Mainly to explain DAGScheduler Will block infinitely directly job The result returned to . Let's move on submitJob Processing logic :
[ Many important logic screenshots are not easy to express , So try to insert the source code directly instead of screenshots ]
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Compare the number of partitions , Make sure that the task is not initiated on a partition that does not exist
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
// Get current job Unique identification of ( You can know job The only identification is from 0 Start , Every one more job Just add 1)
val jobId = nextJobId.getAndIncrement()
// If the partition to be processed is empty
if (partitions.isEmpty) {
// Clone related attributes
val clonedProperties = Utils.cloneProperties(properties)
if (sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) == null) {
clonedProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, callSite.shortForm)
}
val time = clock.getTimeMillis()
// Direct transmission job Start and end events
listenerBus.post(
SparkListenerJobStart(jobId, time, Seq.empty, clonedProperties))
listenerBus.post(
SparkListenerJobEnd(jobId, time, JobSucceeded))
// return job Run the result encapsulation class , Because there are no partitions to process , So the number of tasks is directly 0
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
// Determine whether the partition is really not empty
assert(partitions.nonEmpty)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
// send out JobSubmitted event
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
waiter
}You can see job The only identification of is from 0 Start , And every time there is a new job Then add 1. In addition to When the pending partition is empty ,spark It will be sent according to the process job Start and end event notification , But not to TaskScheduler Issue task , The result class returned directly is also a record task The number of 0.
Next, let's focus on the processing logic that partitions are not empty , It will pass eventProcessLoop towards DAGScheduler Submit your own JobSubmitted event , The event receiving and processing logic is as follows :

Let's go to specific handleJobSubmitted Method Take a look in the middle :
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
// Point one : According to the present RDD establish ResultStage
var finalStage: ResultStage = null
try {
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
(_: Int, value: Int) => value + 1)
logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " +
s"but only ${e.maxConcurrentTasks} are available. " +
s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times")
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
barrierJobIdToNumTasksCheckFailures.remove(jobId)
// encapsulation job object
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
// stay job and resultStage Establish mutual references
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
// Send via bus job Start events
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// Key point 2 : Submit stage
submitStage(finalStage)
}The above code is more important , There are two points , One is stage The creation of , One stage Submission of , Let's take a look at :
Point one : According to the present RDD establish ResultStage
This code is reasonably simple , It should not be the point . In fact, this is understanding DAGScheduler cutting job One by one stage The core of the algorithm . First of all, let's take a look ResultStage Type information for , You can know that it is Stage One of the implementation classes of :

Stage There are two implementation classes , One is ResultStage, One is ShuffleMapStage, And from Stage We can know the construction parameters of ,stage Is related by blood , He will record the father he depends on stage. In order to understand the subsequent source code , Let's go first stage The logic of cutting is introduced first .
stage Cutting algorithm : Submitted as a behavior operator RDD As the last one RDD, And with it RDD establish ResultStage, Subsequently passed RDD Look forward to his father RDD, If his father RDD It's narrow dependence , Then it will be classified into the current stage, If the father RDD It's broad dependence , The one that will be widely dependent RDD As a new Stage The dividing line of , And the one with wide dependence RDD It's also new stage The first one in RDD, Then pass the new stage Of RDD Look ahead , If the father RDD It's narrow dependence , It will be classified as new stage, If it's broad dependence , Repeat the previous creation stage The process of . One thing to note here is that , Submitted as a behavior operator RDD Based on stage yes ResultStage, Its accountant calculates the final result , And because of wide dependence stage yes ShuffleMapStage, It will have a shuffle Operation process .
Here we can also see that job and stage The relationship between , That is, a job At least one stage, Specifically stage The data of depends on the whole job How many times does wide dependency appear in the process .
Now let's take a brief look createResultStage Method :
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
// adopt rdd Get parent stage
val parents = getOrCreateParentStages(rdd, jobId)
// obtain stage Unique identification , It also comes from 0 Start , Every time 1
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
// Record stage、job Related information
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}It's a simple piece of code , The only thing that's a little complicated is through rdd Get his father stage, This is not the purpose of our reading , So I won't go into details , Let's move on to the next point .
Key point 2 : Submit stage
With the front stage Introduction to the theory of division , This piece of reading will not be so confused , Let's first look at the source code :
private def submitStage(stage: Stage): Unit = {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(s"submitStage($stage (name=${stage.name};" +
s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
// Point one : obtain stage Dependent parent stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
// Key point 2 : Submit stage Mission
submitMissingTasks(stage, jobId.get)
} else {
// If stage There is a father stage, Then repeat the current process , Know that there is no father stage Submit the task again
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}The method logic of this part is generally : see stage Whether there is a parent stage, Have a father stage Then repeat the current process , Until there is no father stage Submit the task again . Although the logic is not complicated , But it contains a lot of content , The first is the key point getMissingParentStages Method contains stage Partition algorithm , The second is focus two submitMissingTasks It involves stage To task Task conversion and submission , Let's take a look at :
Point one : obtain stage Dependent parent stage
This is mainly through RDD The blood relationship and width of depend on the determination stage And whether it is necessary to open a new stage, Source code is as follows :
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
// Will the current rdd Join the waiting list , It is convenient for the later official to start looking forward
waitingForVisit += stage.rdd
// Please note that , This is an internal method , It will be below while Called back and forth in the loop
def visit(rdd: RDD[_]): Unit = {
// avoid rdd Be read again ( Repeated visits are likely to cause an endless loop )
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
// adopt rdd blood kinship , Find its parent dependency , If it is a wide dependency, create a new stage, On the contrary, only add the parent dependency to the list to be accessed
for (dep <- rdd.dependencies) {
dep match {
// If it's broad dependence , Then the width depends on rdd establish ShuffleMapStage
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
// If it's narrow dependence , Then add the dependency to the list to be accessed
case narrowDep: NarrowDependency[_] =>
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
}
while (waitingForVisit.nonEmpty) {
// Access unprocessed rdd, And remove it from the list to be accessed , This one needs to know remove Usage of , It returns the element to be removed
visit(waitingForVisit.remove(0))
}
missing.toList
}The logic of this part is the same as that we introduced earlier stage The partition algorithm is roughly the same , In short, through rdd blood kinship , Find its parent dependency , If it is a wide dependency, create a new stage, On the contrary, only add the parent dependency to the list to be accessed .
Key point 2 : Submit stage Mission
There are many contents in this section , about stage There are also some processing of parameter attributes, which are not discussed in detail , We only marked the points related to our purpose of reading the source code , Let's first look at the source code :
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
stage match {
case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)
case _ =>
}
// Point one : Calculate the partition index set on which the calculation to be performed depends
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
var taskBinaryBytes: Array[Byte] = null
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
logWarning(s"Broadcasting large task binary with size " +
s"${Utils.bytesToString(taskBinaryBytes.length)}")
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
return
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
// Key point 2 : according to stage Type and the number of partitions to calculate task aggregate
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
// Point three : towards TaskScheduler Submit task set
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
}You can see that although there are many codes , But we only need to focus on three points , One is the calculation of the number of partitions , One is taskset Generation of sets , The other one is task Task submission , Now let's take a look at :
Point one 、 Two Calculate the number of partitions and create task aggregate
The number of partitions is mainly calculated from RDD obtain ,task There is nothing to introduce about the creation of collections , It is based on stage Different Created a series of... With the same processing logic task, It only deals with different partitions .
At this point, we can also get stage and taskset The relationship between , One stage Corresponding to one taskset, One taskset There are multiple task,task Number of elements stage Is related to the number of partitions to be processed .
Point three : towards TaskScheduler Submit task set
here taskScheduler It's an abstract interface , There is only one TaskSchedulerImpl Implementation class , Let's look at the submission logic :
override def submitTasks(taskSet: TaskSet): Unit = {
// obtain task Aggregate information
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
// Create task set manager
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// obtain taskset Corresponding stage Information
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
// Associate task managers with task dependencies
stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run(): Unit = {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
// a key : call SchedulerBackend Of reviveOffers Method to further process the submitted taskset aggregate
backend.reviveOffers()
}Then take a look reviverOffers Method , This method has two implementation classes , In a LocalSchedulerBackend in , In a CoarseGrainedSchedulerBackend in , our StandaloneSchedulerBackend The implementation class just inherits the latter , So let's go CoarseGrainedSchedulerBackend See its specific implementation :

You can see that it will send a ReviveOffers Event out , The only receiver of this event is the class itself , Let's move on to the receiving event :

Then look inside :
private def makeOffers(): Unit = {
val taskDescs = withLock {
// Point one : Choose qualified executor
val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort),
executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer)
})
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (taskDescs.nonEmpty) {
// Key point 2 : Send task
launchTasks(taskDescs)
}
}There are two main points , The first point is actually very simple , The reason why it is marked , It is to echo the remaining problems in our previous source code article (spark Source code ( 3、 ... and )spark How to do driver、executor Task scheduling , as well as executor towards driver Registration of ). I won't talk about the relationship between that problem and the source code here , Those who are interested can look forward , In the previous article, there is a detailed introduction .
Key point 2 : Send task
Let's look at the source code first :
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
// Traverse task Mission
for (task <- tasks.flatten) {
val serializedTask = TaskDescription.encode(task)
// Verify whether the content of task serialization transmission exceeds the specified size
if (serializedTask.limit() >= maxRpcMessageSize) {
Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " +
s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
// obtain executor Information
val executorData = executorDataMap(task.executorId)
//executor by task Allocate resources , The resource is in task Release after execution
executorData.freeCores -= scheduler.CPUS_PER_TASK
task.resources.foreach { case (rName, rInfo) =>
assert(executorData.resourcesInfo.contains(rName))
executorData.resourcesInfo(rName).acquire(rInfo.addresses)
}
logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")
// towards executor send out LaunchTask Mission
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}You can see , He will judge the task first rpc Whether the transmission size exceeds the limit , If more than , Throw exception , If within the normal range , to executor send out task Mission , Note that there task It's a hair , Not the whole thing taskset Send it together .
So far, our goal of reading the source code has been achieved , We actually have two behavior operators , One is first One collect, This is just about first Task distribution process ,collect Of didn't introduce , The main reason is that the two issues are exactly the same , It's all through runJob Method submission job, The reason for adding one here first Method , Mainly to explore application and job The relationship between , The following monitoring page is shown , We have two behavior operators , Naturally, there are two job.

summary :
1、application、job、stage、taskset、task Relationship
application and job Corresponding relation : One application There may be more than one job Submit , Because every behavior operator calls runjob Method , That is, they all correspond to one job Task submitted
job and stage Corresponding relation : One job There may be more than one stage, The division is mainly based on RDD Is it happening shuffle, That is, every wide dependency will lead to new stage Generation .
stage and taskset Relationship : One stage Corresponding to one taskset, And for stage Create one for each partition in task Mission .
taskset and task Relationship :taskset Contains a set of task, Every task The execution logic is the same , Mainly for stage Different partitions .
2、job、stage Unique identification from 0 Start Statistics , For each new one, add 1.
3、stage Cutting algorithm : Submitted as a behavior operator RDD As the last one RDD, And with it RDD establish ResultStage, Subsequently passed RDD Look forward to his father RDD, If his father RDD It's narrow dependence , Then it will be classified into the current stage, If the father RDD It's broad dependence , The one that will be widely dependent RDD As a new Stage The dividing line of , And the one with wide dependence RDD It's also new stage The first one in RDD, Then pass the new stage Of RDD Look ahead , If the father RDD It's narrow dependence , It will be classified as new stage, If it's broad dependence , Repeat the previous creation stage The process of . One thing to note here is that , Submitted as a behavior operator RDD Based on stage yes ResultStage, Its accountant calculates the final result , And because of wide dependence stage yes ShuffleMapStage, It will have a shuffle Operation process .
4、stage There are two implementation classes ResultStage and ShuffleMapStage,task There are also two task sets , Namely ResultTask and ShuffleMapTask.
边栏推荐
- Wave animation color five pointed star loader loading JS special effects
- Yarn重启applications记录恢复
- 【牛客刷题-SQL大厂面试真题】NO2.用户增长场景(某度信息流)
- The stack size specified is too small, specify at least 328k
- Shell script imports stored procedures into the database
- Flinkcdc should extract Oracle in real time. What should be configured for oracle?
- Google Earth Engine(GEE)——全球人类居住区网格数据 1975-1990-2000-2014 (P2016)
- 彩色五角星SVG动态网页背景js特效
- 科学创业三问:关于时机、痛点与重要决策
- 我选的热门专业,四年后成了“天坑”
猜你喜欢

JS discolored Lego building blocks

内容审计技术

Simple two ball loading

MySQL statistical bill information (Part 2): data import and query

Function test process in software testing

Operator-1初识Operator
![[development of large e-commerce projects] performance pressure test - basic concept of pressure test & jmeter-38](/img/50/819b9c2f69534afc6dc391c9de5f05.png)
[development of large e-commerce projects] performance pressure test - basic concept of pressure test & jmeter-38
Example code of second kill based on MySQL optimistic lock

Judea pearl, Turing prize winner: 19 causal inference papers worth reading recently

ZABBIX 6.0 source code installation and ha configuration
随机推荐
Different test techniques
SSO and JWT good article sorting
彩色五角星SVG动态网页背景js特效
Detailed explanation of OSPF LSA of routing Foundation
Application of 5g industrial gateway in scientific and technological overload control; off-site joint law enforcement for over limit, overweight and overspeed
Investment analysis and prospect prediction report of global and Chinese dimethyl sulfoxide industry Ⓦ 2022 ~ 2028
Declare an abstract class vehicle, which contains the private variable numofwheel and the public functions vehicle (int), horn (), setnumofwheel (int) and getnumofwheel (). Subclass mot
Introduction to topological sorting
Apache-Atlas-2.2.0 独立编译部署
Router. use() requires a middleware function but got a Object
Jenkins+webhooks-多分支参数化构建-
Use of shutter SQLite
mysql统计账单信息(下):数据导入及查询
Mysql间隙锁
啟動solr報錯The stack size specified is too small,Specify at least 328k
Simple two ball loading
Research Report on China's software outsourcing industry investment strategy and the 14th five year plan Ⓡ 2022 ~ 2028
Qtdeisgner, pyuic detailed use tutorial interface and function logic separation (nanny teaching)
路由基础之OSPF LSA详细讲解
Operator-1初识Operator