​ 在【spark源码-任务提交流程之YarnClusterApplication】中分析到,Task execution process in the implementationYarnClusterApplication过程中,会将amContext parameter for packaging,Then application submitted toRM;It will beamClassParameters for the Settings;

​ 针对yarn-cluster模式,amClass = bin/java org.apache.spark.deploy.yarn.ApplicationMaster;The application submitted toRM后,RM会选择一台NM机器启动AM;

​ 下面将针对AMThe start of the analysis;

2.main 主入口

​ 全路径:org.apache.spark.deploy.yarn.ApplicationMaster;

​ 在ApplicationMaster启动后,做了如下3件事:

​ 1、解析参数;

​ 2、实例化AM

​ 3、执行run

object ApplicationMaster extends Logging {
  def main(args: Array[String]): Unit = {
    val amArgs = new ApplicationMasterArguments(args)
    master = new ApplicationMaster(amArgs)


​ According to the parameters define the application entry point、jars、参数、Properties file information such as;

​ 同时设置executor默认数量为2;

class ApplicationMasterArguments(val args: Array[String]) {
  //应用程序 jar And any options included in the jar 由参数--jar定义
  var userJar: String = null
  var userClass: String = null
  var primaryPyFile: String = null
  var primaryRFile: String = null
  var userArgs: Seq[String] = Nil
  //Additional attributes of the file
  var propertiesFile: String = null


  private def parseArgs(inputArgs: List[String]): Unit = {
    val userArgsBuffer = new ArrayBuffer[String]()

    var args = inputArgs

    while (!args.isEmpty) {
      // --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
      // the properties with executor in their names are preferred.
      args match {
        case ("--jar") :: value :: tail =>
          userJar = value
          args = tail

        case ("--class") :: value :: tail =>
          userClass = value
          args = tail

        case ("--primary-py-file") :: value :: tail =>
          primaryPyFile = value
          args = tail

        case ("--primary-r-file") :: value :: tail =>
          primaryRFile = value
          args = tail

        case ("--arg") :: value :: tail =>
          userArgsBuffer += value
          args = tail

        case ("--properties-file") :: value :: tail =>
          propertiesFile = value
          args = tail

        case _ =>
          printUsageAndExit(1, args)

    if (primaryPyFile != null && primaryRFile != null) {
      // scalastyle:off println
      System.err.println("Cannot have primary-py-file and primary-r-file at the same time")
      // scalastyle:on println

    userArgs = userArgsBuffer.toList

  def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
object ApplicationMasterArguments {


​ 在实例化AM过程中,The following events:

​ 实例化sparkConfAnd set the parameters in the properties file tosparkConf中;

​ 将sparkConfThe parameters set in the system properties;

​ 实例化securityMgr;

​ 实例化RM Client;

​ Localization of load client Settings file list

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {

  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
  // optimal as more containers are available. Might need to handle this better.

  private val isClusterMode = args.userClass != null
	//实例化spark config
  private val sparkConf = new SparkConf()
  if (args.propertiesFile != null) {
    //The parameters in the configuration properties file cache tosparkConf中:通过hashMap实现缓存
    Utils.getPropertiesFromFile(args.propertiesFile).foreach {
     case (k, v) =>
      sparkConf.set(k, v)

  private val securityMgr = new SecurityManager(sparkConf实例化)

  private var metricsSystem: Option[MetricsSystem] = None

  // 将sparkConfThe parameter is set to the system properties
  sparkConf.getAll.foreach {
     case (k, v) =>
    sys.props(k) = v

  private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

  private val userClassLoader = {
    val classpath = Client.getUserClasspath(sparkConf)
    val urls = classpath.map {
     entry =>
      new URL("file:" + new File(entry.getPath()).getAbsolutePath())

    if (isClusterMode) {
      if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
        new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
      } else {
        new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
    } else {
      new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)

  //The token updater
  private val credentialRenewer: Option[AMCredentialRenewer] = sparkConf.get(KEYTAB).map {
     _ =>
    new AMCredentialRenewer(sparkConf, yarnConf)

  //使用UGIThe user as the currentApplicationMasterThe operation of the use
  private val ugi = credentialRenewer match {
    case Some(cr) =>
      // Set the context class loader so that the token renewer has access to jars distributed
      // by the user.
      val currentLoader = Thread.currentThread().getContextClassLoader()
      try {
      } finally {

    case _ =>

  private val client = doAsUser {
     new YarnRMClient() }

  private val maxNumExecutorFailures = {
    val effectiveNumExecutors =
      if (Utils.isDynamicAllocationEnabled(sparkConf)) {
      } else {
    // By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need
    // avoid the integer overflow here.
    val defaultMaxNumExecutorFailures = math.max(3,
      if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors))


  @volatile private var exitCode = 0
  @volatile private var unregistered = false
  @volatile private var finished = false
  @volatile private var finalStatus = getDefaultFinalStatus
  @volatile private var finalMsg: String = ""
  @volatile private var userClassThread: Thread = _

  @volatile private var reporterThread: Thread = _
  @volatile private var allocator: YarnAllocator = _

  // A flag to check whether user has initialized spark context
  @volatile private var registered = false

  // Lock for controlling the allocator (heartbeat) thread.
  private val allocatorLock = new Object()

  // 心跳间隔
  private val heartbeatInterval = {
    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
    math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL)))

  // Allocator polling before initial wait interval,To allow the execution procedure is requested to rise faster
  private val initialAllocationInterval = math.min(heartbeatInterval,

  // Allocator polling before the next wait interval
  private var nextAllocationInterval = initialAllocationInterval

  private var rpcEnv: RpcEnv = null

  // 在集群模式下,用于告诉AM用户的SparkContext已经初始化.
  private val sparkContextPromise = Promise[SparkContext]()

  // Localization of load client Settings file list.Actuator is used when it's start,And loading here,So that these configurations in cluster mode won't pollutionWeb UIThe environment of the page
  private val localResources = doAsUser {
    logInfo("Preparing Local resources")
    val resources = HashMap[String, LocalResource]()

    def setupDistributedCache(
        file: String,
        rtype: LocalResourceType,
        timestamp: String,
        size: String,
        vis: String): Unit = {
      val uri = new URI(file)
      val amJarRsrc = Records.newRecord(classOf[LocalResource])

      val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
      resources(fileName) = amJarRsrc

    val distFiles = sparkConf.get(CACHED_FILES)
    val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
    val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
    val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
    val resTypes = sparkConf.get(CACHED_FILES_TYPES)

    for (i <- 0 to distFiles.size - 1) {
      val resType = LocalResourceType.valueOf(resTypes(i))
      setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,

    // Distribute the conf archive to executors.
    sparkConf.get(CACHED_CONF_ARCHIVE).foreach {
     path =>
      val uri = new URI(path)
      val fs = FileSystem.get(uri, yarnConf)
      val status = fs.getFileStatus(new Path(uri))
      // SPARK-16080: Make sure to use the correct name for the destination when distributing the
      // conf archive to executors.
      val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(),
      setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE,
        status.getModificationTime().toString, status.getLen.toString,

    // Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
    CACHE_CONFIGS.foreach {
     e =>



​ In view of the cluster pattern,设置系统属性、构建spark调用上下文、调用runDriver方法

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
	final def run(): Int = {
    doAsUser {

  private def runImpl(): Unit = {
    try {
      val appAttemptId = client.getAttemptId()

      var attemptID: Option[String] = None

      //Set up the cluster model of system properties
      if (isClusterMode) {
        // Set the web ui port to be ephemeral for yarn so we don't conflict with
        // other spark processes running on the same box
        System.setProperty("spark.ui.port", "0")

        // Set the master and deploy mode property to match the requested mode.
        System.setProperty("spark.master", "yarn")
        System.setProperty("spark.submit.deployMode", "cluster")

        // Set this internal configuration if it is running on cluster mode, this
        // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())

        attemptID = Option(appAttemptId.getAttemptId.toString)

      //在HDFS和Yarn上建立Spark调用上下文.Context will be made by the incoming parameters structure
      new CallerContext(
        "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
        Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()

      logInfo("ApplicationAttemptId: " + appAttemptId)

      // This shutdown hook should run *after* the SparkContext is shut down.
      val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
      ShutdownHookManager.addShutdownHook(priority) {
     () =>
        val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts

        if (!finished) {
          // The default state of ApplicationMaster is failed if it is invoked by shut down hook.
          // This behavior is different compared to 1.x version.
          // If user application is exited ahead of time by calling System.exit(N), here mark
          // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
          // System.exit(0) to terminate the application.
            "Shutdown hook called before final status was reported.")

        if (!unregistered) {
          // we only want to unregister if we don't want the RM to retry
          if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
            unregister(finalStatus, finalMsg)

      if (isClusterMode) {
      } else {
    } catch {
      case e: Exception =>
        // catch everything else if not specifically handled
        logError("Uncaught exception: ", e)
          "Uncaught exception: " + StringUtils.stringifyException(e))
    } finally {
      try {
        metricsSystem.foreach {
     ms =>
      } catch {
        case e: Exception =>
          logWarning("Exception during stopping of the metric system: ", e)
  private def doAsUser[T](fn: => T): T = {
    ugi.doAs(new PrivilegedExceptionAction[T]() {
      override def run: T = fn


private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
  private def runDriver(): Unit = {
    userClassThread = startUserApplication()

    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    logInfo("Waiting for spark context initialization...")
    val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
    try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        rpcEnv = sc.env.rpcEnv

        val userConf = sc.getConf
        val host = userConf.get("spark.driver.host")
        val port = userConf.get("spark.driver.port").toInt
        registerAM(host, port, userConf, sc.ui.map(_.webUrl))

        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
        createAllocator(driverRef, userConf)
      } else {
        // Sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a SparkContext.
        throw new IllegalStateException("User did not initialize spark context!")
    } catch {
      case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
          s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
           "Please check earlier log output for errors. Failing the application.")
          "Timed out waiting for SparkContext.")
    } finally {
} 启动一个driver线程

​ driverIs a user class code execution thread name;

​ 通过spark-submit提交参数–classSpecify the class path to determine user classes;

​ Through reflection to get the user classmain方法;

​ Create a thread to execute themain方法;

​ driver线程是amThe child thread process;

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
	private def startUserApplication(): Thread = {
    logInfo("Starting the user application in a separate Thread")

    var userArgs = args.userArgs
    if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
      // When running pyspark, the app is run using PythonRunner. The second argument is the list
      // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
      userArgs = Seq(args.primaryPyFile, "") ++ userArgs
    if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
      // TODO(davies): add R dependencies here

    //By reflecting access to user classesmain方法
    val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])

    val userThread = new Thread {
      override def run() {
        try {
          if (!Modifier.isStatic(mainMethod.getModifiers)) {
            logError(s"Could not find static main method in object ${
            finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
          } else {
            mainMethod.invoke(null, userArgs.toArray)
            finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
            logDebug("Done running user class")
        } catch {
          case e: InvocationTargetException =>
            e.getCause match {
              case _: InterruptedException =>
                // Reporter thread can interrupt to stop user class
              case SparkUserAppException(exitCode) =>
                val msg = s"User application exited with status $exitCode"
                finish(FinalApplicationStatus.FAILED, exitCode, msg)
              case cause: Throwable =>
                logError("User class threw exception: " + cause, cause)
                  "User class threw exception: " + StringUtils.stringifyException(cause))
        } finally {
          // Notify the thread waiting for the SparkContext, in case the application did not
          // instantiate one. This will do nothing when the user code instantiates a SparkContext
          // (with the correct master), or when the user code throws an exception (due to the
          // tryFailure above).
    //设置线程名为driver:driver线程为AMUnder the process of a child thread,This thread to perform user classesmain方法
    //启动线程:执行线程run方法,And calls to perform user definedmain方法

​ 通过RPC方式向RM注册AM;

​ 通过将AM的host、port、Provide trackingweb url注册到RM并得到RM响应,完成注册流程;

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
	private def registerAM(
      host: String,	//dirver的ip
      port: Int,		//driver线程的port
      _sparkConf: SparkConf,
      uiAddress: Option[String]): Unit = {
    val appId = client.getAttemptId().getApplicationId().toString()
    val attemptId = client.getAttemptId().getAttemptId().toString()
    val historyAddress = ApplicationMaster
      .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)

    client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
    registered = true

private[spark] class YarnRMClient extends Logging {
  def register(
      driverHost: String,
      driverPort: Int,
      conf: YarnConfiguration,
      sparkConf: SparkConf,
      uiAddress: Option[String],
      uiHistoryAddress: String): Unit = {
    //构建AM与RMCommunications client and start the
    amClient = AMRMClient.createAMRMClient()
    this.uiHistoryAddress = uiHistoryAddress

    val trackingUrl = uiAddress.getOrElse {
      if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""

    logInfo("Registering the ApplicationMaster")
    synchronized {
      amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
      registered = true

public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
  //注册AM:将AM的host、port、appTrackingUrl信息绑定到AMRMClient中,Ready to register the environment
  public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException {
        this.appHostName = appHostName;
        this.appHostPort = appHostPort;
        this.appTrackingUrl = appTrackingUrl;
        Preconditions.checkArgument(appHostName != null, "The host name should not be null");
        Preconditions.checkArgument(appHostPort >= -1, "Port number of the host should be any integers larger than or equal to -1");
    		//环境准备好后,Call the specific registration logic
        return this.registerApplicationMaster();

    private RegisterApplicationMasterResponse registerApplicationMaster() throws YarnException, IOException {
      	//Registration information encapsulation
        RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest.newInstance(this.appHostName, this.appHostPort, this.appTrackingUrl);
        RegisterApplicationMasterResponse response = this.rmClient.registerApplicationMaster(request);
        synchronized(this) {
            this.lastResponseId = 0;
            if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {

            return response;
} RegisterApplicationMasterRequest The registration request information encapsulation

​ 主要封装host、port、web url;

public abstract class RegisterApplicationMasterRequest {
  public static RegisterApplicationMasterRequest newInstance(String host, int port, String trackingUrl) {
        RegisterApplicationMasterRequest request = (RegisterApplicationMasterRequest)Records.newRecord(RegisterApplicationMasterRequest.class);
        //ApplicationMasterStart the nodehost
        //ApplicationMasterThe launch of foreignrpc的端口号
        //ApplicationMasterProvide trackingweb url,用户可以通过该urlCheck the application execution status
        return request;
} egisterApplicationMasterResponse Registered response information encapsulation

​ In response to important information:You may apply for the biggest singleContainerUtilization of resources、应用程序访问控制列表;

public class RegisterApplicationMasterResponsePBImpl extends RegisterApplicationMasterResponse {
    Builder builder = null;
    boolean viaProto = false;
  	//You may apply for the biggest singleContainerUtilization of resources
    private Resource maximumResourceCapability;
    private Map<ApplicationAccessType, String> applicationACLS = null;
    private List<Container> containersFromPreviousAttempts = null;
    private List<NMToken> nmTokens = null;
    private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;

private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
	private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
    val appId = client.getAttemptId().getApplicationId().toString()
    val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,

    // Before we initialize the allocator, let's log the information about how executors will
    // be run up front, to avoid printing this out for every single executor being launched.
    // Use placeholders for information that changes such as executor IDs.
    logInfo {
      val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
      val executorCores = _sparkConf.get(EXECUTOR_CORES)
      val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
        "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)

    allocator = client.createAllocator(


    // Initialize the AM endpoint *after* the allocator has been initialized. This ensures
    // that when the driver sends an initial executor request (e.g. after an AM restart),
    // the allocator is ready to service requests.
    rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

    //Distributor allocate resources
    val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
    val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
    ms.registerSource(new ApplicationMasterSource(prefix, allocator))
    // do not register static sources in this case as per SPARK-25277
    metricsSystem = Some(ms)
    reporterThread = launchReporterThread()
private[spark] class YarnRMClient extends Logging {
  def createAllocator(
      conf: YarnConfiguration,
      sparkConf: SparkConf,
      driverUrl: String,
      driverRef: RpcEndpointRef,
      securityMgr: SecurityManager,
      localResources: Map[String, LocalResource]): YarnAllocator = {
    require(registered, "Must register AM before creating allocator.")
    new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
      localResources, new SparkRackResolver())
} application、分配资源
private[yarn] class YarnAllocator(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    amClient: AMRMClient[ContainerRequest],
    appAttemptId: ApplicationAttemptId,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resolver: SparkRackResolver,
    clock: Clock = new SystemClock)
  extends Logging {
    def allocateResources(): Unit = synchronized {
    //Update the container request list:According to the current runningexecutorQuantity and to requestexecutor总数,To update to theResourceManager请求的containers数.

    val progressIndicator = 0.1f
    // 向ResourceManager申请containers资源,And return allocation response
    val allocateResponse = amClient.allocate(progressIndicator)
    val allocatedContainers = allocateResponse.getAllocatedContainers()
    //Blacklist node tracking

    //如果分配的containers数大于0,Deal with thesecontainers
    if (allocatedContainers.size > 0) {
      logDebug(("Allocated containers: %d. Current executor count: %d. " +
        "Launching executor count: %d. Cluster resources: %s.")

    //Access has been usecontainers列表,Also may be wrongcontainers
    val completedContainers = allocateResponse.getCompletedContainersStatuses()
    if (completedContainers.size > 0) {
      logDebug("Completed %d containers".format(completedContainers.size))
      //Processing usedcontainers
      logDebug("Finished processing %d completed containers. Current running executor count: %d."
        .format(completedContainers.size, runningExecutors.size))
} Update the container request

​ According to the current runningexecutorQuantity and to requestexecutor总数,To update to theResourceManager请求的containers数.

​ 根据每个节点的task,Suspends the container request list to group:

​ Local can match the request list;

​ The local list does not match the request;

​ Non-local requests list;

​ For those who match the request of the local list two requests,Will be cancelled and re-launch the request,然后,根据containerPlacement strategy to recalculate the local,To maximize the local task execution;

​ Focus on container for local,Try to improve the container for local,Don't can't satisfy the request of the local,将其取消;

​ 当已存在的executorNumber more than the need ofexecutor数量时,Removed from the pending container request number of container for more;

private[yarn] class YarnAllocator(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    amClient: AMRMClient[ContainerRequest],
    appAttemptId: ApplicationAttemptId,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resolver: SparkRackResolver,
    clock: Clock = new SystemClock)
  extends Logging {
	def updateResourceRequests(): Unit = {
    // Pending container request sequences
    val pendingAllocate = getPendingAllocate
    val numPendingAllocate = pendingAllocate.size
    //Calculate the missingexecutor数量
    val missing = targetNumExecutors - numPendingAllocate -
      numExecutorsStarting.get - runningExecutors.size
    logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
      s"pending: $numPendingAllocate, running: ${
      runningExecutors.size}, " +
      s"executorsStarting: ${

    // Pending container for classification:
    // localRequests: Local can match the request list
    // staleRequests: The local list does not match the request
    // anyHostRequests: Non-local requests list;
    val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
      hostToLocalTaskCounts, pendingAllocate)

    if (missing > 0) {
      logInfo(s"Will request $missing executor container(s), each with " +
      resource.getVirtualCores} core(s) and " +
      resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")

      // Cancel the local list does not match the request
      staleRequests.foreach {
     stale =>
      val cancelledContainers = staleRequests.size
      if (cancelledContainers > 0) {
        logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")

      // Computing available container number
      val availableContainers = missing + cancelledContainers

      // Calculation of potential container:The non-local requests include,Try to increase the degree of localization;
      val potentialContainers = availableContainers + anyHostRequests.size

      //重新计算每个containerThe node of local sex(node locality)And frame local(rack locality,The current frame other nodes)
      val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
        potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
          allocatedHostToContainersMap, localRequests)

      val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
      containerLocalityPreferences.foreach {
        case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
          newLocalityRequests += createContainerRequest(resource, nodes, racks)
        case _ =>

      //当前可用的containersCan meet all the newcontainer请求
      if (availableContainers >= newLocalityRequests.size) {
        for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
          newLocalityRequests += createContainerRequest(resource, null, null)
      //当前可用的containersCan't meet all the newcontainer请求,Can't satisfy the request of will in the other frame on the node placecontainer,So will cancel these requests,To obtain a better local
      else {
        val numToCancel = newLocalityRequests.size - availableContainers
        anyHostRequests.slice(0, numToCancel).foreach {
     nonLocal =>
        if (numToCancel > 0) {
          logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")

      newLocalityRequests.foreach {
     request =>

      if (log.isInfoEnabled()) {
        val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
        if (anyHost.nonEmpty) {
          logInfo(s"Submitted ${
      anyHost.size} unlocalized container requests.")
        localized.foreach {
     request =>
          logInfo(s"Submitted container request for host ${
    //挂起的+启动的+运行中的executorNumber of diverse needsexecutor数量,From hangs in the request to cancel the request of more;
    else if (numPendingAllocate > 0 && missing < 0) {
      val numToCancel = math.min(numPendingAllocate, -missing)
      logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
        s"total $targetNumExecutors executors.")
      // cancel pending allocate requests by taking locality preference into account
      val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
} Treatment allocation of resources

​ 根据匹配规则,在节点、机架、Other frame3A scenario select available containers;

​ In the available container to startexecutor

private[yarn] class YarnAllocator(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    amClient: AMRMClient[ContainerRequest],
    appAttemptId: ApplicationAttemptId,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resolver: SparkRackResolver,
    clock: Clock = new SystemClock)
  extends Logging {
  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)

    // 根据节点(host)Combined with the matching rules,Select available containers
    val remainingAfterHostMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- allocatedContainers) {
      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
        containersToUse, remainingAfterHostMatches)

    // According to the frame combination matching rules,Select available containers:A separate thread to complete
    val remainingAfterRackMatches = new ArrayBuffer[Container]
    if (remainingAfterHostMatches.nonEmpty) {
      var exception: Option[Throwable] = None
      val thread = new Thread("spark-rack-resolver") {
        override def run(): Unit = {
          try {
            for (allocatedContainer <- remainingAfterHostMatches) {
              val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
              matchContainerToRequest(allocatedContainer, rack, containersToUse,
          } catch {
            case e: Throwable =>
              exception = Some(e)

      try {
      } catch {
        case e: InterruptedException =>
          throw e

      if (exception.isDefined) {
        throw exception.get

    // This node, and the frame of the container,再次匹配
    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
    for (allocatedContainer <- remainingAfterRackMatches) {
      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,

    //After this node、本机架、This node, and the frame 3After the matching filter in,Is not matching on,内部释放
    if (!remainingAfterOffRackMatches.isEmpty) {
      logDebug(s"Releasing ${
      remainingAfterOffRackMatches.size} unneeded containers that were " +
        s"allocated to us")
      for (container <- remainingAfterOffRackMatches) {

    logInfo("Received %d containers from YARN, launching executors on %d of them."
      .format(allocatedContainers.size, containersToUse.size))


​ 在AM中,做了3件事情:

​ 1、创建driver线程并启动driver线程,执行用户类定义的main方法;

​ 2、向RM注册AM;

​ 3、AM向RM申请资源,根据资源(containers)匹配规则选择可用资源,And in the fragmentation of thecontainer中启动executor;

​ driver线程是一个AMIn the process of execution user classesmain方法的线程,在定义driver线程后,就将driver线程启动了,启动后执行的runMethod to complete the user class definitionmain方法的调用;

​ AM向RM注册时,将AM的host、port、web url传给RM,RMThe maximum may apply for a singleContainerUtilization of resources、Application access control list feedback;

​ AM向RM申请资源的过程:在AMCreate resources distributor,By the resource allocator toRM申请资源、筛选资源(container)、在container中启动executor;



