当前位置:网站首页>Workflow of driver of spark kernel (stage division, task division, task scheduling)
Workflow of driver of spark kernel (stage division, task division, task scheduling)
2022-06-13 03:34:00 【TRX1024】
Catalog
One 、Spark Several concepts in
Two 、 Context object SparkContext The core attributes of
3、 ... and 、Spark RDD Dependency of
1. What is dependency ? What is kinship ? What's the role ?
2.RDD How to save dependencies
3. Narrow dependence & Wide dependence and its role
Four 、Stage Division and Stage Dispatch
1.Stage Basic steps of division
5、 ... and 、Task Division and Task Dispatch
1.Task Basic steps of task division
I learned from the front Spark Job submitted to Yarn The implementation process of : Spark kernel ( Execution principle ) Environmental preparation
Now learn Driver workflow .Driver Thread is mainly initialization SparkContext object , Prepare the context needed to run , Then on the one hand, keep up with ApplicationMaster Of RPC Connect , adopt ApplicationMaster Application resources ; On the other hand, it starts to schedule tasks according to the user's business logic , Send the task to the existing idle Executor On .
When ResourceManager towards ApplicationMaster return Container Resource time ,ApplicationMaster Just taste
Try the corresponding Container Start the Executor process ,Executor When it gets better , Will send to Driver Reverse registration ,
After successful registration, keep with Driver The heart of , Wait at the same time Driver Distribute tasks , When the distributed tasks are finished ,
Report task status to Driver.
One 、Spark Several concepts in
One Spark Applications include SparkContext、Job、Stage as well as Task Four concepts :
- Application: Initialize a SparkContext That is to generate a Application;
- Job In order to Action Method as the boundary , Meet a Action Method triggers a Job;
- Stage yes Job Subset , With RDD Wide dependence ( namely Shuffle) As a boundary , encounter Shuffle Make a division ;
- Task yes Stage Subset , In parallel degree ( Partition number ) To measure , What's the number of partitions , How many task.
Be careful :Application->Job->Stage->Task Every floor is 1 Yes n The relationship between .
Spark Generally speaking, the task scheduling is divided into two ways , Along the way Stage Level of scheduling , Along the way Task Level of scheduling , total
The volume scheduling process is shown in the following figure :
Two 、 Context object SparkContext The core attributes of
among ,Spark RDD Through its Transactions operation , To form the RDD blood kinship ( rely on ) The diagram , namely DAG, Finally through Action Call to , Trigger Job And schedule execution , Two schedulers are created during execution :DAGScheduler
and TaskScheduler.
- DAGScheduler be responsible for Stage Level of scheduling , Mainly is to job Cut into a number of Stages, And will each Stage Pack it up TaskSet hand TaskScheduler Dispatch .
- TaskScheduler be responsible for Task Level of scheduling , take DAGScheduler Here it is TaskSet Distribute to... According to the specified scheduling policy Executor On the implementation , During the scheduling process SchedulerBackend Responsible for providing available resources , among SchedulerBackend There are many implementations , Connect different resource management systems .
Driver initialization SparkContext In the process , Will be initialized separately DAGScheduler、TaskScheduler、
SchedulerBackend as well as HeartbeatReceiver, And start the SchedulerBackend as well as HeartbeatReceiver.
SchedulerBackend adopt ApplicationMaster Application resources , And constantly from TaskScheduler Get the right one Task Distribute to Executor perform .
HeartbeatReceiver Responsible for receiving Executor Heartbeat information of , monitor Executor The survival of , And inform TaskScheduler.
3、 ... and 、Spark RDD Dependency of
In understanding Stage Before dividing , Let's get to know RDD Dependency of .
1. What is dependency ? What is kinship ? What's the role ?
Two adjacent RDD The relationship of is called Dependency relationship , Such as val rdd1 = rdd.map(_*2) Express rdd1 Depend on rdd, Namely new RDD Rely on the old RDD, Several consecutive RDD Dependency of , be called Consanguinity .
effect :
Look at this example , There is such a dependency :RDD1->RDD2->RDD3->RDD4,RDD4 It is created by reading data from the data source , The logic at the code level is :
val RDD4 = sc.textFile("sssssssss")
val RDD3 = RDD4.flatMap()
val RDD2 = RDD3.map()
val RDD1 = RDD2.reduceByKey()
Suppose the program runs to val RDD1 = RDD2.reduceByKey() It failed , So this Task Will face restart . That's the question ,RDD1 How do I know what my last step was ? If you start from scratch , Where is the head ? We know RDD It won't save data , It only saves the data structure and calculation logic , If you encounter task Failure , That won't make the whole job run again ?
therefore RDD To provide fault tolerance , Need to put RDD The dependencies between them are preserved , So if something goes wrong , The data source can be re read and calculated according to the blood relationship . This is the same. The role of dependencies Where .
2.RDD How to save dependencies
First look at a schematic diagram :
Reading :
- RDD4 Depends on files datas/word.txt, It's through textFile Operator ,RDD4 The dependency information will be saved
- RDD3 from RDD4 the flatMap obtain , Depend on oneself RDD4, It preserves its dependencies and RDD4 Dependence
- RDD2 from RDD3 the map obtain , Depend on oneself RDD3, It preserves its dependencies and RDD4、RDD3 Dependence
- By analogy
So how to view dependencies in the program ?
Through one demo Let's test it , Can pass rdd.toDebugString To print the current rdd Of information :
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("TRXTest")
val sc = new SparkContext(sparkConf) // Environment object
val lines = sc.textFile("datas/word.txt")
// Print kinship
println("RDD4 Dependency of :")
println(lines.toDebugString)
println("**********************")
val words = lines.flatMap(_.split(" "))
println("RDD3 Dependency of :")
println(words.toDebugString)
println("**********************")
val wordToOne = words.map(word => (word, 1))
println("RDD2 Dependency of :")
println(wordToOne.toDebugString)
println("**********************")
val wordToSum = wordToOne.reduceByKey(_ + _)
println("RDD1 Dependency of :")
println(wordToSum.toDebugString)
println("**********************")
val array = wordToSum.collect() // collect
array.foreach(println)
sc.stop()
}
Running results :
RDD4 Dependency of :
(1) datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
| datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
RDD3 Dependency of :
(1) MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
| datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
| datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
RDD2 Dependency of :
(1) MapPartitionsRDD[3] at map at Spark01_RDD_dep.scala:24 []
| MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
| datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
| datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
RDD1 Dependency of :
(1) ShuffledRDD[4] at reduceByKey at Spark01_RDD_dep.scala:28 []
+-(1) MapPartitionsRDD[3] at map at Spark01_RDD_dep.scala:24 []
| MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
| datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
| datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
In terms of running results , It is consistent with the picture drawn above .
3. Narrow dependence & Wide dependence and its role
From the above demo You can see the returned results , There are... In dependencies MapPartitionsRDD and ShuffledRDD Two kinds of , What does that mean ?
MapPartitionsRDD Type dependencies are
val RDD4 = sc.textFile("sssssssss")
val RDD3 = RDD4.flatMap()
val RDD2 = RDD3.map()
You can find , hypothesis RDD There's only one division , These are a few RDD There is no in the transformation Shuffle operation , Each of them depends on only one RDD, We call it OneToOne rely on ( Narrow dependence ). namely Narrow dependencies represent each parent ( The upstream )RDD Of Partition Most quilts ( The downstream )RDD One of the Partition Use , The metaphor of narrow dependence on our image is the only child .
So the corresponding val RDD1 = RDD2.reduceByKey() As a result of Shuffle operation ,RDD1 Depends on upstream RDD Data for partitions , This dependence , be called Shuffle rely on ( Wide dependence ). The same father ( The upstream )RDD Of Partition By multiple children ( The downstream )RDD Of Partition rely on , Can cause Shuffle, summary : The metaphor of wide dependence on our image is multiple birth .
So what's the difference between narrow dependence and wide dependence effect Well ? This is what we will learn next :Stage Divide .
Four 、Stage Division and Stage Dispatch
Spark Task scheduling is from DAG Cutting begins , Mainly from DAGScheduler To complete . When I meet a Action After the operation, it will trigger a Job The calculation of , And to DAGScheduler To submit .
1.Stage Basic steps of division
- So let's create one ResultStage
- Based on the incoming RDD( the last one , Remember to do the current RDD), Find its dependence
- If you encounter Shuffle rely on , Just create one ShuffleMapStage
- And then put the current RDD Point to dependent RDD
- In fact, before each creation , Will check the current RDD Yes, whether the dependency is Shuffle rely on
- If it is , Will move forward to create stage
- therefore :spark The division of stages in =shuffle The amount of dependence +1
- Each stage is independent
Take a simple example WordCount Of Stage Divide the process :
- Job from saveAsTextFile Trigger , So let's create one ResultStage
- from RDD-3( Remember to do the current RDD) Start backtracking search dependencies , Until there is no dependence on RDD-0
- RDD- 3 rely on RDD-2, And it's broad dependence (Shuffle rely on )
- So in RDD-2 and RDD-3 Division between Stage,RDD-3 To the last one Stage, namely ResultStage in
- RDD-2 rely on RDD-1,RDD-1 rely on RDD-0, These are narrow dependencies
- So will RDD-0、RDD-1 and RDD-2 To divide into the same Stage, namely ShuffleMapStage in
2.Stage Dispatch
During program execution , One Stage Whether to be submitted , Need to judge its father Stage Whether the implementation is completed , Only in the father Stage You can only submit the current Stage, If one Stage No father Stage, So it's time to Stage Start submission .
Stage When you submit it, you will Task Information ( Partition information and methods ) Serialized and packaged into TaskSet hand TaskScheduler, One Partition Corresponding to one Task, On the other hand TaskScheduler Will monitor Stage Operating state , Only Executor Lost or lost Task because Fetch If you fail, you need to re submit the failed one Stage To schedule failed tasks , Other types Task Failure will be in TaskScheduler Try again during the scheduling process of .
relatively speaking DAGScheduler It's easier to do , Just in Stage On the level of division DAG, Submit
Stage And monitor the relevant status information .TaskScheduler It's relatively complicated .
5、 ... and 、Task Division and Task Dispatch
1.Task Basic steps of task division
- stage After the division , Start submission stage, from ResultStage Start
- The current... Will be judged before submission stage Is there any stage,
- If there is , Submit the previous stage, hand TaskScheduler
- without , Start to create task
- Every stage How many in all task It depends on a stage in , the last one RDD The number of divisions
- So the total number of tasks is each stage the last one RDD The sum of the number of partitions
2.Task Scheduling steps
How to divide Task Send to Executor In the implementation of ?
- Will divide the good task form TaskSet
- then TaskSet Encapsulated into TaskSetManage
- Construct scheduler ( Default FIFO Scheduler ,yarn Of RM There are also schedulers )
- There is an attribute in the scheduler called rootPool, It can be understood as task pool
- The constructor will TaskSetManage Put it in rootPool in
- Traverse the back rootPool Scheduling tasks
- Task It contains calculation logic and data ( It should be the storage location of data )
- Task There's an attribute in : Localization level
- Location of calculations and data ( Calculation :task, At this time in driver On , The data is in Executor On ) There are different levels , This level is called Localization level
- Problems that can be solved at this level : There is one driver, One task, Two Executor:A,B
- If one Task The data needed is in A On , When the task arrives A when , Highest efficiency , otherwise B Need from A Copy the data
- So how to decide Task Where to send it ?--task Localization level of
- How to determine the Task The localization level of ?
- By calling getPreferrdeLocations() obtain partition Priority of ,
- Due to a partition Corresponding to one Task, this par The priority is task Priority of
- According to each Task Priority of , determine Task Localization level of
- Localization level ( From high to low ):
- Process localization : Data and calculation are in the same Executor In progress .
- Node localization : Data and computation are in the same node ,task And data are no longer the same Executor In progress , Data needs to be transferred between processes .
- Rack localization : Data and computation are on two nodes in the same rack , Data needs to be transmitted between nodes through the network .
- No preference : about task Come on , It's the same where you get data , There is no difference between good and bad .
- arbitrarily :task And data can be anywhere in the cluster , And not in a rack , The worst performance .
- After traversal and screening, we finally get task, Start the mission (def launchTask)
- find task Need to send executorEndpoint( terminal ), Send it a message
- Hair ?--launchTask()
- Serialize the task and send it
TaskSetManager Responsible for monitoring and managing the same Stage Medium Tasks,TaskScheduler That is to say TaskSetManager Scheduling tasks for units .
边栏推荐
- Azure SQL db/dw series (9) -- re understanding the query store (2) -- working principle
- China Civil Aviation Statistical Yearbook (1996-2020)
- MySQL learning summary 8: addition, deletion and modification of data processing
- MASA Auth - SSO与Identity设计
- Doris data aggregation
- Patrick Pichette, partner of inovia, former chief financial officer of Google and current chairman of twitter, joined the board of directors of neo4j
- Implode and explode in golang
- Redis memory optimization and distributed locking
- Differences of several query methods in PDO
- [azure data platform] ETL tool (3) - azure data factory copy from local data source to azure
猜你喜欢
[azure data platform] ETL tool (5) -- use azure data factory data stream to convert data
English语法_方式副词-位置
Dish recommendation system based on graph database
Microservice practice based on rustlang
Use cypher to get the tree of the specified structure
YoloV5-Face+TensorRT:基于WIN10+TensorRT8.2+VS2019得部署
Neo4j auradb free, the world's leading map database
Carbon neutralization & Patent Innovation: multi indicator data such as patent panels (original documents) of provinces, cities and counties, and the number of low-carbon patents authorized
Azure SQL db/dw series (14) -- using query store (3) -- common scenarios
Understanding the ongdb open source map data foundation from the development of MariaDB
随机推荐
视频播放屡破1000W+,在快手如何利用二次元打造爆款
Simulink代码生成: 查表模块及其代码
PostgreSQL common SQL
Rustup installation
The latest collation of the number of years of education per capita in the country and provinces -1989-2020- includes the annual original data, calculation process and result summary
Prefecture level city - air flow coefficient data - updated to 2019 (including 10m wind speed, boundary height, etc.)
Azure SQL db/dw series (13) -- using query store (2) -- report Introduction (2)
look on? What is the case between neo4j and ongdb?
Microservice practice based on rustlang
Scala method and function notes
Application scenarios of large arrows in Scala
2000-2019 enterprise registration data of provinces, cities and counties in China (including longitude and latitude, number of registrations and other multi indicator information)
Least recently used cache (source force deduction)
Loading process of [JVM series 3] classes
Neil eifrem, CEO of neo4j, interprets the chart data platform and leads the development of database in the next decade
MapReduce internal execution principle
Yolov5 face+tensorrt: deployment based on win10+tensorrt8.2+vs2019
Dish recommendation system based on graph database
Display line number in MySQL query result
Alibaba cloud OSS access notes