当前位置:网站首页>spark source code - task submission process - 3-ApplicationMaster

spark source code - task submission process - 3-ApplicationMaster

2022-08-05 06:12:00 zdaiqing

1.概述

​ 在【spark源码-任务提交流程之YarnClusterApplication】中分析到,Task execution process in the implementationYarnClusterApplication过程中,会将amContext parameter for packaging,Then application submitted toRM;It will beamClassParameters for the Settings;

​ 针对yarn-cluster模式,amClass = bin/java org.apache.spark.deploy.yarn.ApplicationMaster;The application submitted toRM后,RM会选择一台NM机器启动AM;

​ 下面将针对AMThe start of the analysis;

2.main 主入口

​ 全路径:org.apache.spark.deploy.yarn.ApplicationMaster;

​ 在ApplicationMaster启动后,做了如下3件事:

​ 1、解析参数;

​ 2、实例化AM

​ 3、执行run

object ApplicationMaster extends Logging {
    
  def main(args: Array[String]): Unit = {
    
    SignalUtils.registerLogger(log)
    //解析参数
    val amArgs = new ApplicationMasterArguments(args)
    //实例化AM
    master = new ApplicationMaster(amArgs)
    //执行run
    System.exit(master.run())
  }
}

2.1.解析AM参数进行封装

​ According to the parameters define the application entry point、jars、参数、Properties file information such as;

​ 同时设置executor默认数量为2;

class ApplicationMasterArguments(val args: Array[String]) {
    
  //应用程序 jar And any options included in the jar 由参数--jar定义
  var userJar: String = null
  //应用程序的入口点
  var userClass: String = null
  var primaryPyFile: String = null
  var primaryRFile: String = null
  //应用程序参数
  var userArgs: Seq[String] = Nil
  //Additional attributes of the file
  var propertiesFile: String = null

  parseArgs(args.toList)

  private def parseArgs(inputArgs: List[String]): Unit = {
    
    val userArgsBuffer = new ArrayBuffer[String]()

    var args = inputArgs

    while (!args.isEmpty) {
    
      // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
      // the properties with executor in their names are preferred.
      args match {
    
        case ("--jar") :: value :: tail =>
          userJar = value
          args = tail

        case ("--class") :: value :: tail =>
          userClass = value
          args = tail

        case ("--primary-py-file") :: value :: tail =>
          primaryPyFile = value
          args = tail

        case ("--primary-r-file") :: value :: tail =>
          primaryRFile = value
          args = tail

        case ("--arg") :: value :: tail =>
          userArgsBuffer += value
          args = tail

        case ("--properties-file") :: value :: tail =>
          propertiesFile = value
          args = tail

        case _ =>
          printUsageAndExit(1, args)
      }
    }

    if (primaryPyFile != null && primaryRFile != null) {
    
      // scalastyle:off println
      System.err.println("Cannot have primary-py-file and primary-r-file at the same time")
      // scalastyle:on println
      System.exit(-1)
    }

    userArgs = userArgsBuffer.toList
  }

  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
    
  	//..............
  }
}
object ApplicationMasterArguments {
    
  val DEFAULT_NUMBER_EXECUTORS = 2
}

2.2.实例化AM

​ 在实例化AM过程中,The following events:

​ 实例化sparkConfAnd set the parameters in the properties file tosparkConf中;

​ 将sparkConfThe parameters set in the system properties;

​ 实例化securityMgr;

​ 实例化RM Client;

​ Localization of load client Settings file list

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
    

  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
  // optimal as more containers are available. Might need to handle this better.

  private val isClusterMode = args.userClass != null
	//实例化spark config
  private val sparkConf = new SparkConf()
  if (args.propertiesFile != null) {
    
    //The parameters in the configuration properties file cache tosparkConf中:通过hashMap实现缓存
    Utils.getPropertiesFromFile(args.propertiesFile).foreach {
     case (k, v) =>
      sparkConf.set(k, v)
    }
  }

  //根据sparkConf实例化securityMgr
  private val securityMgr = new SecurityManager(sparkConf实例化)

  private var metricsSystem: Option[MetricsSystem] = None

  // 将sparkConfThe parameter is set to the system properties
  sparkConf.getAll.foreach {
     case (k, v) =>
    sys.props(k) = v
  }

  //根据sparkConf构建yarnConf
  private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

  //实例化类加载器
  private val userClassLoader = {
    
    val classpath = Client.getUserClasspath(sparkConf)
    val urls = classpath.map {
     entry =>
      new URL("file:" + new File(entry.getPath()).getAbsolutePath())
    }

    if (isClusterMode) {
    
      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
    
        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
      } else {
    
        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
      }
    } else {
    
      new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
    }
  }

  //The token updater
  private val credentialRenewer: Option[AMCredentialRenewer] = sparkConf.get(KEYTAB).map {
     _ =>
    new AMCredentialRenewer(sparkConf, yarnConf)
  }

  //使用UGIThe user as the currentApplicationMasterThe operation of the use
  private val ugi = credentialRenewer match {
    
    case Some(cr) =>
      // Set the context class loader so that the token renewer has access to jars distributed
      // by the user.
      val currentLoader = Thread.currentThread().getContextClassLoader()
      Thread.currentThread().setContextClassLoader(userClassLoader)
      try {
    
        cr.start()
      } finally {
    
        Thread.currentThread().setContextClassLoader(currentLoader)
      }

    case _ =>
      SparkHadoopUtil.get.createSparkUser()
  }

  //实例化RM
  private val client = doAsUser {
     new YarnRMClient() }

  //默认为executor数量的两倍(如果启用动态分配,则为最大executor数量的两倍),最小为3
  private val maxNumExecutorFailures = {
    
    val effectiveNumExecutors =
      if (Utils.isDynamicAllocationEnabled(sparkConf)) {
    
        sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
      } else {
    
        sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
      }
    // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need
    // avoid the integer overflow here.
    val defaultMaxNumExecutorFailures = math.max(3,
      if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors))

    sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
  }

  @volatile private var exitCode = 0
  @volatile private var unregistered = false
  @volatile private var finished = false
  @volatile private var finalStatus = getDefaultFinalStatus
  @volatile private var finalMsg: String = ""
  @volatile private var userClassThread: Thread = _

  @volatile private var reporterThread: Thread = _
  @volatile private var allocator: YarnAllocator = _

  // A flag to check whether user has initialized spark context
  @volatile private var registered = false

  // Lock for controlling the allocator (heartbeat) thread.
  private val allocatorLock = new Object()

  // 心跳间隔
  private val heartbeatInterval = {
    
    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
    math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL)))
  }

  // Allocator polling before initial wait interval,To allow the execution procedure is requested to rise faster
  private val initialAllocationInterval = math.min(heartbeatInterval,
    sparkConf.get(INITIAL_HEARTBEAT_INTERVAL))

  // Allocator polling before the next wait interval
  private var nextAllocationInterval = initialAllocationInterval

  private var rpcEnv: RpcEnv = null

  // 在集群模式下,用于告诉AM用户的SparkContext已经初始化.
  private val sparkContextPromise = Promise[SparkContext]()

  // Localization of load client Settings file list.Actuator is used when it's start,And loading here,So that these configurations in cluster mode won't pollutionWeb UIThe environment of the page
  private val localResources = doAsUser {
    
    logInfo("Preparing Local resources")
    val resources = HashMap[String, LocalResource]()

    def setupDistributedCache(
        file: String,
        rtype: LocalResourceType,
        timestamp: String,
        size: String,
        vis: String): Unit = {
    
      val uri = new URI(file)
      val amJarRsrc = Records.newRecord(classOf[LocalResource])
      amJarRsrc.setType(rtype)
      amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
      amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
      amJarRsrc.setTimestamp(timestamp.toLong)
      amJarRsrc.setSize(size.toLong)

      val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
      resources(fileName) = amJarRsrc
    }

    val distFiles = sparkConf.get(CACHED_FILES)
    val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
    val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
    val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
    val resTypes = sparkConf.get(CACHED_FILES_TYPES)

    for (i <- 0 to distFiles.size - 1) {
    
      val resType = LocalResourceType.valueOf(resTypes(i))
      setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
      visibilities(i))
    }

    // Distribute the conf archive to executors.
    sparkConf.get(CACHED_CONF_ARCHIVE).foreach {
     path =>
      val uri = new URI(path)
      val fs = FileSystem.get(uri, yarnConf)
      val status = fs.getFileStatus(new Path(uri))
      // SPARK-16080: Make sure to use the correct name for the destination when distributing the
      // conf archive to executors.
      val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(),
        Client.LOCALIZED_CONF_DIR)
      setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE,
        status.getModificationTime().toString, status.getLen.toString,
        LocalResourceVisibility.PRIVATE.name())
    }

    // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
    CACHE_CONFIGS.foreach {
     e =>
      sparkConf.remove(e)
      sys.props.remove(e.key)
    }

    resources.toMap
  }
}

2.3.执行AM的run方法

​ In view of the cluster pattern,设置系统属性、构建spark调用上下文、调用runDriver方法

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
    
	final def run(): Int = {
    
    doAsUser {
    
      //执行runImpl实现方法
      runImpl()
    }
    exitCode
  }

  private def runImpl(): Unit = {
    
    try {
    
      val appAttemptId = client.getAttemptId()

      var attemptID: Option[String] = None

      //Set up the cluster model of system properties
      if (isClusterMode) {
    
        // Set the web ui port to be ephemeral for yarn so we don't conflict with
        // other spark processes running on the same box
        System.setProperty("spark.ui.port", "0")

        // Set the master and deploy mode property to match the requested mode.
        System.setProperty("spark.master", "yarn")
        System.setProperty("spark.submit.deployMode", "cluster")

        // Set this internal configuration if it is running on cluster mode, this
        // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())

        attemptID = Option(appAttemptId.getAttemptId.toString)
      }

      //在HDFS和Yarn上建立Spark调用上下文.Context will be made by the incoming parameters structure
      new CallerContext(
        "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
        Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()

      logInfo("ApplicationAttemptId: " + appAttemptId)

      // This shutdown hook should run *after* the SparkContext is shut down.
      val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
      ShutdownHookManager.addShutdownHook(priority) {
     () =>
        val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts

        if (!finished) {
    
          // The default state of ApplicationMaster is failed if it is invoked by shut down hook.
          // This behavior is different compared to 1.x version.
          // If user application is exited ahead of time by calling System.exit(N), here mark
          // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
          // System.exit(0) to terminate the application.
          finish(finalStatus,
            ApplicationMaster.EXIT_EARLY,
            "Shutdown hook called before final status was reported.")
        }

        if (!unregistered) {
    
          // we only want to unregister if we don't want the RM to retry
          if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    
            unregister(finalStatus, finalMsg)
            cleanupStagingDir()
          }
        }
      }

      if (isClusterMode) {
    
        //yarn-cluster模式,执行runDriver
        runDriver()
      } else {
    
        runExecutorLauncher()
      }
    } catch {
    
      case e: Exception =>
        // catch everything else if not specifically handled
        logError("Uncaught exception: ", e)
        finish(FinalApplicationStatus.FAILED,
          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
          "Uncaught exception: " + StringUtils.stringifyException(e))
    } finally {
    
      try {
    
        metricsSystem.foreach {
     ms =>
          ms.report()
          ms.stop()
        }
      } catch {
    
        case e: Exception =>
          logWarning("Exception during stopping of the metric system: ", e)
      }
    }
  }
  
  private def doAsUser[T](fn: => T): T = {
    
    ugi.doAs(new PrivilegedExceptionAction[T]() {
    
      override def run: T = fn
    })
  }
}

2.3.1.runDriver

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
    
  private def runDriver(): Unit = {
    
    addAmIpFilter(None)
    //启动用户程序,返回一个线程,即启动driver线程
    userClassThread = startUserApplication()

    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    logInfo("Waiting for spark context initialization...")
    val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
    try {
    
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
    
        rpcEnv = sc.env.rpcEnv

        val userConf = sc.getConf
        val host = userConf.get("spark.driver.host")
        val port = userConf.get("spark.driver.port").toInt
        //注册AM:AM向RM注册
        registerAM(host, port, userConf, sc.ui.map(_.webUrl))

        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)
        //申请资源
        createAllocator(driverRef, userConf)
      } else {
    
        // Sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a SparkContext.
        throw new IllegalStateException("User did not initialize spark context!")
      }
      resumeDriver()
      userClassThread.join()
    } catch {
    
      case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
        logError(
          s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
           "Please check earlier log output for errors. Failing the application.")
        finish(FinalApplicationStatus.FAILED,
          ApplicationMaster.EXIT_SC_NOT_INITED,
          "Timed out waiting for SparkContext.")
    } finally {
    
      resumeDriver()
    }
  }
}

2.3.1.1.startUserApplication 启动一个driver线程

​ driverIs a user class code execution thread name;

​ 通过spark-submit提交参数–classSpecify the class path to determine user classes;

​ Through reflection to get the user classmain方法;

​ Create a thread to execute themain方法;

​ driver线程是amThe child thread process;

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
    
	private def startUserApplication(): Thread = {
    
    logInfo("Starting the user application in a separate Thread")

    //解析用户参数
    var userArgs = args.userArgs
    if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
    
      // When running pyspark, the app is run using PythonRunner. The second argument is the list
      // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
      userArgs = Seq(args.primaryPyFile, "") ++ userArgs
    }
    if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
    
      // TODO(davies): add R dependencies here
    }

    //By reflecting access to user classesmain方法
    val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])

    //创建新线程
    val userThread = new Thread {
    
      override def run() {
    
        try {
    
          if (!Modifier.isStatic(mainMethod.getModifiers)) {
    
            logError(s"Could not find static main method in object ${
      args.userClass}")
            finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
          } else {
    
            //通过反射实现main方法调用
            mainMethod.invoke(null, userArgs.toArray)
            finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
            logDebug("Done running user class")
          }
        } catch {
    
          case e: InvocationTargetException =>
            e.getCause match {
    
              case _: InterruptedException =>
                // Reporter thread can interrupt to stop user class
              case SparkUserAppException(exitCode) =>
                val msg = s"User application exited with status $exitCode"
                logError(msg)
                finish(FinalApplicationStatus.FAILED, exitCode, msg)
              case cause: Throwable =>
                logError("User class threw exception: " + cause, cause)
                finish(FinalApplicationStatus.FAILED,
                  ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
                  "User class threw exception: " + StringUtils.stringifyException(cause))
            }
            sparkContextPromise.tryFailure(e.getCause())
        } finally {
    
          // Notify the thread waiting for the SparkContext, in case the application did not
          // instantiate one. This will do nothing when the user code instantiates a SparkContext
          // (with the correct master), or when the user code throws an exception (due to the
          // tryFailure above).
          sparkContextPromise.trySuccess(null)
        }
      }
    }
    userThread.setContextClassLoader(userClassLoader)
    //设置线程名为driver:driver线程为AMUnder the process of a child thread,This thread to perform user classesmain方法
    userThread.setName("Driver")
    //启动线程:执行线程run方法,And calls to perform user definedmain方法
    userThread.start()
    //返回driver线程
    userThread
  }
}

2.3.1.2.向RM注册AM

​ 通过RPC方式向RM注册AM;

​ 通过将AM的host、port、Provide trackingweb url注册到RM并得到RM响应,完成注册流程;

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
    
	private def registerAM(
      host: String,	//dirver的ip
      port: Int,		//driver线程的port
      _sparkConf: SparkConf,
      uiAddress: Option[String]): Unit = {
    
    val appId = client.getAttemptId().getApplicationId().toString()
    val attemptId = client.getAttemptId().getAttemptId().toString()
    val historyAddress = ApplicationMaster
      .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)

    //调用YarnRMClient的register方法进行注册
    client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
    registered = true
  }
}

private[spark] class YarnRMClient extends Logging {
    
  def register(
      driverHost: String,
      driverPort: Int,
      conf: YarnConfiguration,
      sparkConf: SparkConf,
      uiAddress: Option[String],
      uiHistoryAddress: String): Unit = {
    
    //构建AM与RMCommunications client and start the
    amClient = AMRMClient.createAMRMClient()
    amClient.init(conf)
    amClient.start()
    this.uiHistoryAddress = uiHistoryAddress

    val trackingUrl = uiAddress.getOrElse {
    
      if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
    }

    logInfo("Registering the ApplicationMaster")
    synchronized {
    
      //注册AM
      amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
      registered = true
    }
  }
}

//AMRMClientImpl是AMRMClient的实现之一
public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
    
  //注册AM:将AM的host、port、appTrackingUrl信息绑定到AMRMClient中,Ready to register the environment
  public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException {
    
        this.appHostName = appHostName;
        this.appHostPort = appHostPort;
        this.appTrackingUrl = appTrackingUrl;
        Preconditions.checkArgument(appHostName != null, "The host name should not be null");
        Preconditions.checkArgument(appHostPort >= -1, "Port number of the host should be any integers larger than or equal to -1");
    		//环境准备好后,Call the specific registration logic
        return this.registerApplicationMaster();
    }

  	//具体AM注册逻辑
    private RegisterApplicationMasterResponse registerApplicationMaster() throws YarnException, IOException {
    
      	//Registration information encapsulation
        RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest.newInstance(this.appHostName, this.appHostPort, this.appTrackingUrl);
      	//RPC方式注册,封装响应信息
        RegisterApplicationMasterResponse response = this.rmClient.registerApplicationMaster(request);
        synchronized(this) {
    
            this.lastResponseId = 0;
            if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
    
                this.populateNMTokens(response.getNMTokensFromPreviousAttempts());
            }

            return response;
        }
    }
}
2.3.1.2.1 RegisterApplicationMasterRequest The registration request information encapsulation

​ 主要封装host、port、web url;

public abstract class RegisterApplicationMasterRequest {
    
  public static RegisterApplicationMasterRequest newInstance(String host, int port, String trackingUrl) {
    
        RegisterApplicationMasterRequest request = (RegisterApplicationMasterRequest)Records.newRecord(RegisterApplicationMasterRequest.class);
        //ApplicationMasterStart the nodehost
    		request.setHost(host);
        //ApplicationMasterThe launch of foreignrpc的端口号
    		request.setRpcPort(port);
        //ApplicationMasterProvide trackingweb url,用户可以通过该urlCheck the application execution status
    		request.setTrackingUrl(trackingUrl);
        return request;
    }
}
2.3.1.2.2 egisterApplicationMasterResponse Registered response information encapsulation

​ In response to important information:You may apply for the biggest singleContainerUtilization of resources、应用程序访问控制列表;

public class RegisterApplicationMasterResponsePBImpl extends RegisterApplicationMasterResponse {
    
    Builder builder = null;
    boolean viaProto = false;
  	//You may apply for the biggest singleContainerUtilization of resources
    private Resource maximumResourceCapability;
  	//应用程序访问控制列表
    private Map<ApplicationAccessType, String> applicationACLS = null;
    private List<Container> containersFromPreviousAttempts = null;
    private List<NMToken> nmTokens = null;
    private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
}

2.3.1.3.AM向RM申请资源

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
    
	private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
    
    val appId = client.getAttemptId().getApplicationId().toString()
    val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

    // Before we initialize the allocator, let's log the information about how executors will
    // be run up front, to avoid printing this out for every single executor being launched.
    // Use placeholders for information that changes such as executor IDs.
    logInfo {
    
      val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
      val executorCores = _sparkConf.get(EXECUTOR_CORES)
      val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
        "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
      dummyRunner.launchContextDebugInfo()
    }

    //创建分配器
    allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)

    credentialRenewer.foreach(_.setDriverRef(driverRef))

    // Initialize the AM endpoint *after* the allocator has been initialized. This ensures
    // that when the driver sends an initial executor request (e.g. after an AM restart),
    // the allocator is ready to service requests.
    rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

    //Distributor allocate resources
    allocator.allocateResources()
    val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
    val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
    ms.registerSource(new ApplicationMasterSource(prefix, allocator))
    // do not register static sources in this case as per SPARK-25277
    ms.start(false)
    metricsSystem = Some(ms)
    reporterThread = launchReporterThread()
  }
}
2.3.1.3.1.createAllocator创建分配器
private[spark] class YarnRMClient extends Logging {
    
  def createAllocator(
      conf: YarnConfiguration,
      sparkConf: SparkConf,
      driverUrl: String,
      driverRef: RpcEndpointRef,
      securityMgr: SecurityManager,
      localResources: Map[String, LocalResource]): YarnAllocator = {
    
    require(registered, "Must register AM before creating allocator.")
    //通过实例化YarnAllocator,由YarnAllocator向RM申请container资源
    new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
      localResources, new SparkRackResolver())
  }
}
2.3.1.3.2.allocateResourcesDistributor application、分配资源
private[yarn] class YarnAllocator(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    amClient: AMRMClient[ContainerRequest],
    appAttemptId: ApplicationAttemptId,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resolver: SparkRackResolver,
    clock: Clock = new SystemClock)
  extends Logging {
    
    
    def allocateResources(): Unit = synchronized {
    
    //Update the container request list:According to the current runningexecutorQuantity and to requestexecutor总数,To update to theResourceManager请求的containers数.
    updateResourceRequests()

    val progressIndicator = 0.1f
    // 向ResourceManager申请containers资源,And return allocation response
    val allocateResponse = amClient.allocate(progressIndicator)
		//获取分配的containers列表
    val allocatedContainers = allocateResponse.getAllocatedContainers()
    //Blacklist node tracking
    allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)

    //如果分配的containers数大于0,Deal with thesecontainers
    if (allocatedContainers.size > 0) {
    
      logDebug(("Allocated containers: %d. Current executor count: %d. " +
        "Launching executor count: %d. Cluster resources: %s.")
        .format(
          allocatedContainers.size,
          runningExecutors.size,
          numExecutorsStarting.get,
          allocateResponse.getAvailableResources))
			//处理从ResourceManager获取的containers,并在containers中启动executor
      handleAllocatedContainers(allocatedContainers.asScala)
    }

    //Access has been usecontainers列表,Also may be wrongcontainers
    val completedContainers = allocateResponse.getCompletedContainersStatuses()
    if (completedContainers.size > 0) {
    
      logDebug("Completed %d containers".format(completedContainers.size))
      //Processing usedcontainers
      processCompletedContainers(completedContainers.asScala)
      logDebug("Finished processing %d completed containers. Current running executor count: %d."
        .format(completedContainers.size, runningExecutors.size))
    }
  }
}
2.3.1.3.2.1.updateResourceRequests Update the container request

​ According to the current runningexecutorQuantity and to requestexecutor总数,To update to theResourceManager请求的containers数.

​ 根据每个节点的task,Suspends the container request list to group:

​ Local can match the request list;

​ The local list does not match the request;

​ Non-local requests list;

​ For those who match the request of the local list two requests,Will be cancelled and re-launch the request,然后,根据containerPlacement strategy to recalculate the local,To maximize the local task execution;

​ Focus on container for local,Try to improve the container for local,Don't can't satisfy the request of the local,将其取消;

​ 当已存在的executorNumber more than the need ofexecutor数量时,Removed from the pending container request number of container for more;

private[yarn] class YarnAllocator(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    amClient: AMRMClient[ContainerRequest],
    appAttemptId: ApplicationAttemptId,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resolver: SparkRackResolver,
    clock: Clock = new SystemClock)
  extends Logging {
    
    
	def updateResourceRequests(): Unit = {
    
    // Pending container request sequences
    val pendingAllocate = getPendingAllocate
    val numPendingAllocate = pendingAllocate.size
    //Calculate the missingexecutor数量
    val missing = targetNumExecutors - numPendingAllocate -
      numExecutorsStarting.get - runningExecutors.size
    logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
      s"pending: $numPendingAllocate, running: ${
      runningExecutors.size}, " +
      s"executorsStarting: ${
      numExecutorsStarting.get}")

    // Pending container for classification:
    // localRequests: Local can match the request list
    // staleRequests: The local list does not match the request
    // anyHostRequests: Non-local requests list;
    val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
      hostToLocalTaskCounts, pendingAllocate)

    if (missing > 0) {
    
      logInfo(s"Will request $missing executor container(s), each with " +
        s"${
      resource.getVirtualCores} core(s) and " +
        s"${
      resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")

      // Cancel the local list does not match the request
      staleRequests.foreach {
     stale =>
        amClient.removeContainerRequest(stale)
      }
      val cancelledContainers = staleRequests.size
      if (cancelledContainers > 0) {
    
        logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
      }

      // Computing available container number
      val availableContainers = missing + cancelledContainers

      // Calculation of potential container:The non-local requests include,Try to increase the degree of localization;
      val potentialContainers = availableContainers + anyHostRequests.size

      //重新计算每个containerThe node of local sex(node locality)And frame local(rack locality,The current frame other nodes)
      val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
        potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
          allocatedHostToContainersMap, localRequests)

      根据计算的containers本地性,重新实例化container请求
      val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
      containerLocalityPreferences.foreach {
    
        case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
          newLocalityRequests += createContainerRequest(resource, nodes, racks)
        case _ =>
      }

      //当前可用的containersCan meet all the newcontainer请求
      if (availableContainers >= newLocalityRequests.size) {
    
        for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
    
          newLocalityRequests += createContainerRequest(resource, null, null)
        }
      } 
      //当前可用的containersCan't meet all the newcontainer请求,Can't satisfy the request of will in the other frame on the node placecontainer,So will cancel these requests,To obtain a better local
      else {
    
        val numToCancel = newLocalityRequests.size - availableContainers
        anyHostRequests.slice(0, numToCancel).foreach {
     nonLocal =>
          amClient.removeContainerRequest(nonLocal)
        }
        if (numToCancel > 0) {
    
          logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
        }
      }

      //重新添加container请求:将请求传递给RM
      newLocalityRequests.foreach {
     request =>
        amClient.addContainerRequest(request)
      }

      if (log.isInfoEnabled()) {
    
        val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
        if (anyHost.nonEmpty) {
    
          logInfo(s"Submitted ${
      anyHost.size} unlocalized container requests.")
        }
        localized.foreach {
     request =>
          logInfo(s"Submitted container request for host ${
      hostStr(request)}.")
        }
      }
    } 
    //挂起的+启动的+运行中的executorNumber of diverse needsexecutor数量,From hangs in the request to cancel the request of more;
    else if (numPendingAllocate > 0 && missing < 0) {
    
      val numToCancel = math.min(numPendingAllocate, -missing)
      logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
        s"total $targetNumExecutors executors.")
      // cancel pending allocate requests by taking locality preference into account
      val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
      cancelRequests.foreach(amClient.removeContainerRequest)
    }
  }
}
2.3.1.3.2.2.handleAllocatedContainers Treatment allocation of resources

​ 根据匹配规则,在节点、机架、Other frame3A scenario select available containers;

​ In the available container to startexecutor

private[yarn] class YarnAllocator(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    amClient: AMRMClient[ContainerRequest],
    appAttemptId: ApplicationAttemptId,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resolver: SparkRackResolver,
    clock: Clock = new SystemClock)
  extends Logging {
    
  
  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
    
    //可用容器列表
    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)

    // 根据节点(host)Combined with the matching rules,Select available containers
    val remainingAfterHostMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- allocatedContainers) {
    
      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
        containersToUse, remainingAfterHostMatches)
    }

    // According to the frame combination matching rules,Select available containers:A separate thread to complete
    val remainingAfterRackMatches = new ArrayBuffer[Container]
    if (remainingAfterHostMatches.nonEmpty) {
    
      var exception: Option[Throwable] = None
      val thread = new Thread("spark-rack-resolver") {
    
        override def run(): Unit = {
    
          try {
    
            for (allocatedContainer <- remainingAfterHostMatches) {
    
              val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
              matchContainerToRequest(allocatedContainer, rack, containersToUse,
                remainingAfterRackMatches)
            }
          } catch {
    
            case e: Throwable =>
              exception = Some(e)
          }
        }
      }
      thread.setDaemon(true)
      thread.start()

      try {
    
        thread.join()
      } catch {
    
        case e: InterruptedException =>
          thread.interrupt()
          throw e
      }

      if (exception.isDefined) {
    
        throw exception.get
      }
    }

    // This node, and the frame of the container,再次匹配
    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- remainingAfterRackMatches) {
    
      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
        remainingAfterOffRackMatches)
    }

    //After this node、本机架、This node, and the frame 3After the matching filter in,Is not matching on,内部释放
    if (!remainingAfterOffRackMatches.isEmpty) {
    
      logDebug(s"Releasing ${
      remainingAfterOffRackMatches.size} unneeded containers that were " +
        s"allocated to us")
      for (container <- remainingAfterOffRackMatches) {
    
        internalReleaseContainer(container)
      }
    }
		//在分配的containers中启动executors
    runAllocatedContainers(containersToUse)

    logInfo("Received %d containers from YARN, launching executors on %d of them."
      .format(allocatedContainers.size, containersToUse.size))
  }
}

3.执行流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GAjslApe-1658742053147)(/Users/daiqing/Library/Application Support/typora-user-images/image-20220725173937925.png)]

4.总结

​ 在AM中,做了3件事情:

​ 1、创建driver线程并启动driver线程,执行用户类定义的main方法;

​ 2、向RM注册AM;

​ 3、AM向RM申请资源,根据资源(containers)匹配规则选择可用资源,And in the fragmentation of thecontainer中启动executor;

​ driver线程是一个AMIn the process of execution user classesmain方法的线程,在定义driver线程后,就将driver线程启动了,启动后执行的runMethod to complete the user class definitionmain方法的调用;

​ AM向RM注册时,将AM的host、port、web url传给RM,RMThe maximum may apply for a singleContainerUtilization of resources、Application access control list feedback;

​ AM向RM申请资源的过程:在AMCreate resources distributor,By the resource allocator toRM申请资源、筛选资源(container)、在container中启动executor;

5.参考资料

spark源码-任务提交流程之YarnClusterApplication

Spark内核之YARN Cluster模式源码详解(Submit详解)

yarn2.7源码分析之ApplicationMaster与ResourceManager.ApplicationMasterService的通信

Yarn的ApplicationMaster介绍

2,spark源码分析-ApplicationMaster启动

spark源码跟踪(十一)ApplicationMasterThe key thread

Spark源码——Spark on YARN Container资源申请分配、Executor的启动

Yarn源码剖析(四)-- AMRegistered with the resource scheduling applicationContainer及启动

Spark源码——Spark on YARN Executor执行Task的过程

原网站

版权声明
本文为[zdaiqing]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/217/202208050514371176.html