当前位置:网站首页>Workflow of driver of spark kernel (stage division, task division, task scheduling)

Workflow of driver of spark kernel (stage division, task division, task scheduling)

2022-06-13 03:34:00 TRX1024

Catalog

One 、Spark Several concepts in

Two 、 Context object SparkContext The core attributes of

3、 ... and 、Spark RDD Dependency of

1. What is dependency ? What is kinship ? What's the role ?

2.RDD How to save dependencies

3. Narrow dependence & Wide dependence and its role

  Four 、Stage Division and Stage Dispatch

1.Stage Basic steps of division

2.Stage Dispatch       

5、 ... and 、Task Division and Task Dispatch

1.Task Basic steps of task division

2.Task Scheduling steps


  I learned from the front Spark Job submitted to Yarn The implementation process of : Spark kernel ( Execution principle ) Environmental preparation

         Now learn Driver workflow .Driver Thread is mainly initialization SparkContext object , Prepare the context needed to run , Then on the one hand, keep up with ApplicationMaster Of RPC Connect , adopt ApplicationMaster Application resources ; On the other hand, it starts to schedule tasks according to the user's business logic , Send the task to the existing idle Executor On .

         When ResourceManager towards ApplicationMaster return Container Resource time ,ApplicationMaster Just taste
Try the corresponding Container Start the Executor process ,Executor When it gets better , Will send to Driver Reverse registration ,
After successful registration, keep with Driver The heart of , Wait at the same time Driver Distribute tasks , When the distributed tasks are finished ,
Report task status to Driver.

One 、Spark Several concepts in

One Spark Applications include SparkContext、Job、Stage as well as Task Four concepts :

  1. Application: Initialize a SparkContext That is to generate a Application;
  2. Job In order to Action Method as the boundary , Meet a Action Method triggers a Job;
  3. Stage yes Job Subset , With RDD Wide dependence ( namely Shuffle) As a boundary , encounter Shuffle Make a division ;
  4. Task yes Stage Subset , In parallel degree ( Partition number ) To measure , What's the number of partitions , How many task.

Be careful :Application->Job->Stage->Task Every floor is 1 Yes n The relationship between .

Spark Generally speaking, the task scheduling is divided into two ways , Along the way Stage Level of scheduling , Along the way Task Level of scheduling , total
The volume scheduling process is shown in the following figure

Two 、 Context object SparkContext The core attributes of

        among ,Spark RDD Through its Transactions operation , To form the RDD blood kinship ( rely on ) The diagram , namely DAG, Finally through Action Call to , Trigger Job And schedule execution , Two schedulers are created during execution :DAGScheduler
and TaskScheduler.

  1. DAGScheduler be responsible for Stage Level of scheduling , Mainly is to job Cut into a number of Stages, And will each Stage Pack it up TaskSet hand TaskScheduler Dispatch .
  2. TaskScheduler be responsible for Task Level of scheduling , take DAGScheduler Here it is TaskSet Distribute to... According to the specified scheduling policy Executor On the implementation , During the scheduling process SchedulerBackend Responsible for providing available resources , among SchedulerBackend There are many implementations , Connect different resource management systems .

        Driver initialization SparkContext In the process , Will be initialized separately DAGScheduler、TaskScheduler、
SchedulerBackend as well as HeartbeatReceiver, And start the SchedulerBackend as well as HeartbeatReceiver.
        SchedulerBackend adopt ApplicationMaster Application resources , And constantly from TaskScheduler Get the right one Task Distribute to Executor perform .

        HeartbeatReceiver Responsible for receiving Executor Heartbeat information of , monitor Executor The survival of , And inform TaskScheduler.

3、 ... and 、Spark RDD Dependency of

        In understanding Stage Before dividing , Let's get to know RDD Dependency of .

1. What is dependency ? What is kinship ? What's the role ?

        Two adjacent RDD The relationship of is called Dependency relationship , Such as val rdd1 = rdd.map(_*2) Express rdd1 Depend on rdd, Namely new RDD Rely on the old RDD, Several consecutive RDD Dependency of , be called Consanguinity .

effect :

      

Look at this example , There is such a dependency :RDD1->RDD2->RDD3->RDD4,RDD4 It is created by reading data from the data source , The logic at the code level is :

val RDD4 = sc.textFile("sssssssss")
val RDD3 = RDD4.flatMap()
val RDD2 = RDD3.map()
val RDD1 = RDD2.reduceByKey()

         Suppose the program runs to val RDD1 = RDD2.reduceByKey() It failed , So this Task Will face restart . That's the question ,RDD1 How do I know what my last step was ? If you start from scratch , Where is the head ? We know RDD It won't save data , It only saves the data structure and calculation logic , If you encounter task Failure , That won't make the whole job run again ?

        therefore RDD To provide fault tolerance , Need to put RDD The dependencies between them are preserved , So if something goes wrong , The data source can be re read and calculated according to the blood relationship . This is the same. The role of dependencies Where .

2.RDD How to save dependencies

First look at a schematic diagram :

Reading :

  1. RDD4 Depends on files datas/word.txt, It's through textFile Operator ,RDD4 The dependency information will be saved
  2. RDD3 from RDD4 the flatMap obtain , Depend on oneself RDD4, It preserves its dependencies and RDD4 Dependence
  3. RDD2 from RDD3 the map obtain , Depend on oneself RDD3, It preserves its dependencies and RDD4、RDD3 Dependence
  4. By analogy

So how to view dependencies in the program ?

         Through one demo Let's test it , Can pass rdd.toDebugString To print the current rdd Of information :

def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local").setAppName("TRXTest")
    val sc = new SparkContext(sparkConf) // Environment object 
    val lines = sc.textFile("datas/word.txt")
    //  Print kinship 
    println("RDD4 Dependency of :")
    println(lines.toDebugString)
    println("**********************")
    val words = lines.flatMap(_.split(" "))
    println("RDD3 Dependency of :")
    println(words.toDebugString)
    println("**********************")
    val wordToOne = words.map(word => (word, 1))
    println("RDD2 Dependency of :")
    println(wordToOne.toDebugString)
    println("**********************")
    val wordToSum = wordToOne.reduceByKey(_ + _)
    println("RDD1 Dependency of :")
    println(wordToSum.toDebugString)
    println("**********************")
    val array = wordToSum.collect() //  collect 
    array.foreach(println)
    sc.stop()
  }

Running results :

RDD4 Dependency of :
(1) datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
 |  datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
RDD3 Dependency of :
(1) MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
 |  datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
 |  datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
RDD2 Dependency of :
(1) MapPartitionsRDD[3] at map at Spark01_RDD_dep.scala:24 []
 |  MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
 |  datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
 |  datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
RDD1 Dependency of :
(1) ShuffledRDD[4] at reduceByKey at Spark01_RDD_dep.scala:28 []
 +-(1) MapPartitionsRDD[3] at map at Spark01_RDD_dep.scala:24 []
    |  MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
    |  datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
    |  datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************

In terms of running results , It is consistent with the picture drawn above .

3. Narrow dependence & Wide dependence and its role

        From the above demo You can see the returned results , There are... In dependencies MapPartitionsRDD and ShuffledRDD Two kinds of , What does that mean ?

MapPartitionsRDD Type dependencies are

val RDD4 = sc.textFile("sssssssss")
val RDD3 = RDD4.flatMap()
val RDD2 = RDD3.map()

         You can find , hypothesis RDD There's only one division , These are a few RDD There is no in the transformation Shuffle operation , Each of them depends on only one RDD, We call it OneToOne rely on ( Narrow dependence ). namely Narrow dependencies represent each parent ( The upstream )RDD Of Partition Most quilts ( The downstream )RDD One of the Partition Use , The metaphor of narrow dependence on our image is the only child .

        So the corresponding val RDD1 = RDD2.reduceByKey() As a result of Shuffle operation ,RDD1 Depends on upstream RDD Data for partitions , This dependence , be called Shuffle rely on ( Wide dependence ). The same father ( The upstream )RDD Of Partition By multiple children ( The downstream )RDD Of Partition rely on , Can cause Shuffle, summary : The metaphor of wide dependence on our image is multiple birth .

        So what's the difference between narrow dependence and wide dependence effect Well ? This is what we will learn next :Stage Divide .

  Four 、Stage Division and Stage Dispatch

        Spark Task scheduling is from DAG Cutting begins , Mainly from DAGScheduler To complete . When I meet a Action After the operation, it will trigger a Job The calculation of , And to DAGScheduler To submit .

1.Stage Basic steps of division

  1. So let's create one ResultStage
  2. Based on the incoming RDD( the last one , Remember to do the current RDD), Find its dependence
  3. If you encounter Shuffle rely on , Just create one ShuffleMapStage
  4. And then put the current RDD Point to dependent RDD
  5. In fact, before each creation , Will check the current RDD Yes, whether the dependency is Shuffle rely on
  6. If it is , Will move forward to create stage
  7. therefore :spark The division of stages in =shuffle The amount of dependence +1
  8. Each stage is independent

Take a simple example WordCount Of Stage Divide the process :

  1. Job from saveAsTextFile Trigger , So let's create one ResultStage
  2. from RDD-3( Remember to do the current RDD) Start backtracking search dependencies , Until there is no dependence on RDD-0
  3. RDD- 3 rely on RDD-2, And it's broad dependence (Shuffle rely on )
  4. So in RDD-2 and RDD-3 Division between Stage,RDD-3 To the last one Stage, namely ResultStage in
  5. RDD-2 rely on RDD-1,RDD-1 rely on RDD-0, These are narrow dependencies
  6. So will RDD-0、RDD-1 and RDD-2 To divide into the same Stage, namely ShuffleMapStage in

2.Stage Dispatch       

         During program execution , One Stage Whether to be submitted , Need to judge its father Stage Whether the implementation is completed , Only in the father Stage You can only submit the current Stage, If one Stage No father Stage, So it's time to Stage Start submission .

        Stage When you submit it, you will Task Information ( Partition information and methods ) Serialized and packaged into TaskSet hand TaskScheduler, One Partition Corresponding to one Task, On the other hand TaskScheduler Will monitor Stage Operating state , Only Executor Lost or lost Task because Fetch If you fail, you need to re submit the failed one Stage To schedule failed tasks , Other types Task Failure will be in TaskScheduler Try again during the scheduling process of .

         relatively speaking DAGScheduler It's easier to do , Just in Stage On the level of division DAG, Submit
Stage And monitor the relevant status information .TaskScheduler It's relatively complicated .

5、 ... and 、Task Division and Task Dispatch

1.Task Basic steps of task division

  1. stage After the division , Start submission stage, from ResultStage Start
  2. The current... Will be judged before submission stage Is there any stage,
  3. If there is , Submit the previous stage, hand TaskScheduler
  4. without , Start to create task
  5. Every stage How many in all task It depends on a stage in , the last one RDD The number of divisions
  6. So the total number of tasks is each stage the last one RDD The sum of the number of partitions

2.Task Scheduling steps

How to divide Task Send to Executor In the implementation of ?

 

  1. Will divide the good task form TaskSet
  2. then TaskSet Encapsulated into TaskSetManage
  3. Construct scheduler ( Default FIFO Scheduler ,yarn Of RM There are also schedulers )
  4. There is an attribute in the scheduler called rootPool, It can be understood as task pool
  5. The constructor will TaskSetManage Put it in rootPool in
  6. Traverse the back rootPool Scheduling tasks
    1. Task It contains calculation logic and data ( It should be the storage location of data )
    2. Task There's an attribute in : Localization level
    3. Location of calculations and data ( Calculation :task, At this time in driver On , The data is in Executor On ) There are different levels , This level is called Localization level
    4. Problems that can be solved at this level : There is one driver, One task, Two Executor:A,B
      1. If one Task The data needed is in A On , When the task arrives A when , Highest efficiency , otherwise B Need from A Copy the data
      2. So how to decide Task Where to send it ?--task Localization level of
      3. How to determine the Task The localization level of ?
        1. By calling getPreferrdeLocations() obtain partition Priority of ,
        2. Due to a partition Corresponding to one Task, this par The priority is task Priority of
        3. According to each Task Priority of , determine Task Localization level of
    5. Localization level ( From high to low ):
      1. Process localization : Data and calculation are in the same Executor In progress .
      2. Node localization : Data and computation are in the same node ,task And data are no longer the same Executor In progress , Data needs to be transferred between processes .
      3. Rack localization : Data and computation are on two nodes in the same rack , Data needs to be transmitted between nodes through the network .
      4. No preference : about task Come on , It's the same where you get data , There is no difference between good and bad .
      5. arbitrarily :task And data can be anywhere in the cluster , And not in a rack , The worst performance .
    6. After traversal and screening, we finally get task, Start the mission (def launchTask)
      1. find task Need to send executorEndpoint( terminal ), Send it a message
      2. Hair ?--launchTask()
      3. Serialize the task and send it

        TaskSetManager Responsible for monitoring and managing the same Stage Medium Tasks,TaskScheduler That is to say TaskSetManager Scheduling tasks for units .

原网站

版权声明
本文为[TRX1024]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202280529582574.html