当前位置:网站首页>Review the executor from the perspective of application submission

Review the executor from the perspective of application submission

2022-06-22 16:57:00 ZH519080

from Application From the perspective of submission Executor, Decrypt Executor When was it started and Executor How to give the result to Application.

Executor When to start

SparkContext After starting , Direct instantiation createTaskScheduler Method ,createTaskScheduler After completion , call TaskScheduler Of start Method , What's actually called is TaskSchedulerImpl Of start Method , stay TaskSchedulerImpl Of start Method implementation SparkDeploySchedulerBackend(spark2.0 Later renamed StandaleSchedulerBackend) Of start Method .SparkDeploySchedulerBackend Of start method commend Package registered to Master,Master Turn around Worker Start specific Executor,command The instructions have been encapsulated ,Executor Specifically, start the process entry class CoarseGrainedExecutorBackend. And then call new() Function to create a AppClient,AppClient One of them is called ClientEndpoint The inner class of , Creating ClientEndpoint When it comes to Command To specify the specific..., which is launched for the current application Executor The name of the entry class for is CoarseGrainedExecutorBackend.ClientEndpoint Inherited from ThreadSafeRpcEndpoint, It is through RPC Mechanism completion and Master Communication for . stay ClientEndpoint Of start In the method , Will pass registerWithMaster Method direction Master send out RegisterApplication request ,Master After receiving the request message , First, through registerApplication Method to complete information registration , Then call schedule Method , stay Worker Start the Executor( Detailed view Master-Driver and Worker-Executor Anatomy chapter ).

override def receive: PartialFunction[Any, Unit] = {   ......
  case RegisterApplication(description, driver) => {
    if (state == RecoveryState.STANDBY) {
    } else {val app = createApplication(description, driver)
      registerApplication(app)
      persistenceEngine.addApplication(app)
      driver.send(RegisteredApplication(app.id, self))
      schedule()}
  }
......}

Master Match received RegisterApplication request , First judge Master Is it STANDBY( spare ), If not, it is ALIVE state , call createApplicaton Method creation applicationInfo, hold applicationInfo Information through registerApplication Method to register , adopt persistenceEngine.addApplication Method to persist , Upon completion driver.send Method direction AppClient Return the information of successful registration .

object ApplicationDescription and RpcEndpointRef Incoming method createApplication in ,createApplication The object returned is ApplicationInfo,Master Class createApplication Method source code :

private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
    ApplicationInfo = {
  val now = System.currentTimeMillis()
  val date = new Date(now)
  val appId = newApplicationId(date)
  new ApplicationInfo(now, appId, desc, date, driver, defaultCores)}

After the ApplicationInfo Object to registerApplication Method , call registerApplication Method to complete application Registration of ,registerApplication Method source code :

private def registerApplication(app: ApplicationInfo): Unit = {
  val appAddress = app.driver.address //Driver Address , be used for Master and Driver signal communication 
  if (addressToApp.contains(appAddress)) {
     return // If any Driver Address , be Driver Already registered , direct return
  }
  applicationMetricsSystem.registerSource(app.appSource)  // Register with the measurement system 
  apps += app  //apps It's a HashSet, Ensure data is not repeated 
  idToApp(app.id) = app  //idToApp It's a HashMap, preservation id and app Correspondence of 
  endpointToApp(app.driver) = app //endpoint yes HashMap, preservation Driver and app Correspondence of 
  addressToApp(appAddress) = app
  waitingApps += app
}

After registration , call shedule Method , This method has two functions :1、 complete Driver Dispatch will waitingDrivers Array Driver Send to meet Worker Up operation ;2、Worker On the node is application start-up Executor. Every new Driver Registration of 、application The registration or resource change of will call schedule Method .Schedule Method is used for the currently scheduled application Schedule available resources , When the conditions are met Worker Start on the node Executor, call startExecutorsOnWorkers Method to complete the operation ( Detailed see Master-Driver and Worker-Executor Anatomy chapter ).

start-up Executor Methods startExecutorsOnWorkers Is to call scheduleExecutorsOnWorkers Method , There are two ways to start this method Executor Strategy : The strategy of sharing in turn and the strategy of occupying in turn ,Master Of scheduleExecutorsOnWorkers Method source code :

private def scheduleExecutorsOnWorkers(
    app: ApplicationInfo,
    usableWorkers: Array[WorkerInfo],
    spreadOutApps: Boolean): Array[Int] = {
  val coresPerExecutor = app.desc.coresPerExecutor
  val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
  val oneExecutorPerWorker = coresPerExecutor.isEmpty
  val memoryPerExecutor = app.desc.memoryPerExecutorMB
  val numUsable = usableWorkers.length
  val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
  val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
  var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
  /** Return whether the specified worker can launch an executor for this app. */
  def canLaunchExecutor(pos: Int): Boolean = {
    val keepScheduling = coresToAssign >= minCoresPerExecutor
    val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
    val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
    if (launchingNewExecutor) {
      val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
      val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
      val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
      keepScheduling && enoughCores && enoughMemory && underLimit
    } else {......}

scheduleExecutorOnWorkers Method is application After allocating resources in a logical sense , Not really in Worker The node is application Allocate resources , When calling allocateWorkerResourceToExecutors Method will be in Worker Actually allocate resources on the node .

private def allocateWorkerResourceToExecutors( app: ApplicationInfo,
......
    val exec = app.addExecutor(worker, coresToAssign)
    launchExecutor(worker, exec)
    app.state = ApplicationState.RUNNING}

launchExecutor Method source code :

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  worker.addExecutor(exec)
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))}

Worker received LaunchExecutor(worker.endpoint.send()) The message will be processed accordingly . matching LaunchDriver Successfully built DriverRunner object , call DriverRunner Of start Method . stay DriverRunner Of start Call in method fetchAndRunExecutor Method , In this method CommandUtils.buildProcessBuilder(appDesc.command...) The incoming entry class is “org.apache.spark.executor.CoarseGrainedExecutorBackend”, When Worker Start in node ExecutorRunner when ,ExecutorRunner Will start CoarseGrainedExecutorBackend process , stay CoarseGrainedExecutorBackend Of onStart In the method , towards Driver issue RegisterExecutor Registration request .Driver The client receives a registration request , Will register Executor Request .CoarseGrainedSchedulerBackend Of receiveAndReply Method source code :

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
    if (executorDataMap.contains(executorId)) { ......}

Driver towards CoarseGrainedExecutorBackend send out RegisteredExecutor news ,CoarseGrainedExecutorBackend received RrgisteredExecutor A new one will be created after the message Executor actuator , And for that Executor Act as a messenger with Driver signal communication .CoarseGrainedExecutorBackend received RegisteredExecutor Implementation method of message receive Source code :

override def receive: PartialFunction[Any, Unit] = {
  case RegisteredExecutor(hostname) =>
    logInfo("Successfully registered with driver")
    executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
    ......}

from CoarseGrainedExecutorBackend Of receive We know from the method ,CoarseGrainedExecutorBackend received RegisteredExecutor After the news , Create a org.apache.spark.executor.Executor object , thus Executor Creation completed .

原网站

版权声明
本文为[ZH519080]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221523254518.html