当前位置:网站首页>Spark source code - task submission process - 4-container to start executor

Spark source code - task submission process - 4-container to start executor

2022-08-05 06:11:00 zdaiqing

1.概述

​ 在spark源码-任务提交流程之ApplicationMaster分析到,在AM中,做了3件事情:

​ 1、创建driver线程并启动driver线程,Execute user class definitionmain方法;

​ 2、向RM注册AM;

​ 3、AM向RM申请资源,根据资源(containers)Matching rules select available resources,and allocatedcontainer中启动executor;

​ 下面分析在container中启动executor的情况;

2.入口

​ 此段代码在AM向RMCalled when requesting a resource;

​ AM向RM申请资源,RM返回资源列表containers,This piece of code is called by the allocator,Implement resource filtering andexecutor启动;详情见spark源码-任务提交流程之ApplicationMaster:AM向RM申请资源

private[yarn] class YarnAllocator(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    amClient: AMRMClient[ContainerRequest],
    appAttemptId: ApplicationAttemptId,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resolver: SparkRackResolver,
    clock: Clock = new SystemClock)
  extends Logging {
    
  
  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
    
    //List of available containers
    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
		
    //......Irrelevant code is omitted
    
		//在分配的containers中启动executors
    runAllocatedContainers(containersToUse)

    //......Irrelevant code is omitted
  }
}

3.runAllocatedContainers

​ The current method will be allocatedcontainers中启动executors;

​ Poll for available containers,Each container corresponds to start a thread,在线程中启动executor;

private[yarn] class YarnAllocator(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    amClient: AMRMClient[ContainerRequest],
    appAttemptId: ApplicationAttemptId,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resolver: SparkRackResolver,
    clock: Clock = new SystemClock)
  extends Logging {
    
  
  private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
    
    //Poll for available containers
    for (container <- containersToUse) {
    
      executorIdCounter += 1
      val executorHostname = container.getNodeId.getHost
      val containerId = container.getId
      val executorId = executorIdCounter.toString
      assert(container.getResource.getMemory >= resource.getMemory)
      logInfo(s"Launching container $containerId on host $executorHostname " +
        s"for executor with ID $executorId")

      def updateInternalState(): Unit = synchronized {
    
        runningExecutors.add(executorId)
        numExecutorsStarting.decrementAndGet()
        executorIdToContainer(executorId) = container
        containerIdToExecutorId(container.getId) = executorId

        val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
          new HashSet[ContainerId])
        containerSet += containerId
        allocatedContainerToHostMap.put(containerId, executorHostname)
      }

      //Determine if a new start is possibleexecutor:正在运行的executor数 < 目标executor数
      if (runningExecutors.size() < targetNumExecutors) {
    
        //Recording startedexecutor数
        numExecutorsStarting.incrementAndGet()
        if (launchContainers) {
    
          //A new thread is enabled in the thread pool,and execute the new thread'srun方法
          launcherPool.execute(new Runnable {
    
            override def run(): Unit = {
    
              try {
    
                //实例化ExecutorRunnableobject and execute the object'srun方法,在run方法中启动executor
                new ExecutorRunnable(
                  Some(container),
                  conf,
                  sparkConf,
                  driverUrl,
                  executorId,
                  executorHostname,
                  executorMemory,
                  executorCores,
                  appAttemptId.getApplicationId.toString,
                  securityMgr,
                  localResources
                ).run()
                //Update built-in state
                updateInternalState()
              } catch {
    
                case e: Throwable =>
                  numExecutorsStarting.decrementAndGet()
                  if (NonFatal(e)) {
    
                    logError(s"Failed to launch executor $executorId on container $containerId", e)
                    // Assigned container should be released immediately
                    // to avoid unnecessary resource occupation.
                    amClient.releaseAssignedContainer(containerId)
                  } else {
    
                    throw e
                  }
              }
            }
          })
        } else {
    
          // For test only
          updateInternalState()
        }
      } else {
    
        logInfo(("Skip launching executorRunnable as running executors count: %d " +
          "reached target executors count: %d.").format(
          runningExecutors.size, targetNumExecutors))
      }
    }
  }
}

4.ExecutorRunnable.run

private[yarn] class ExecutorRunnable(
    container: Option[Container],
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    masterAddress: String,
    executorId: String,
    hostname: String,
    executorMemory: Int,
    executorCores: Int,
    appId: String,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource]) extends Logging {
    

  var rpc: YarnRPC = YarnRPC.create(conf)
  var nmClient: NMClient = _

  def run(): Unit = {
    
    logDebug("Starting Executor Container")
    //NameNode创建、初始化、Start a train
    nmClient = NMClient.createNMClient()
    nmClient.init(conf)
    nmClient.start()
    //启动容器
    startContainer()
  }
}

4.1.startContainer启动容器

​ 在当前方法中,主要做了如下3thing please:

​ 1、Prepare the container to startexecutor的环境;

​ 2、Start in a wrapperexecutor的命令;

​ 3、NameNode中启动Executor;

private[yarn] class ExecutorRunnable(
    container: Option[Container],
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    masterAddress: String,
    executorId: String,
    hostname: String,
    executorMemory: Int,
    executorCores: Int,
    appId: String,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource]) extends Logging {
    

  def startContainer(): java.util.Map[String, ByteBuffer] = {
    
    //Container startup context
    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
      .asInstanceOf[ContainerLaunchContext]
    //准备环境
    val env = prepareEnvironment().asJava

    ctx.setLocalResources(localResources.asJava)
    ctx.setEnvironment(env)

    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    val dob = new DataOutputBuffer()
    credentials.writeTokenStorageToStream(dob)
    ctx.setTokens(ByteBuffer.wrap(dob.getData()))

    //封装参数
    val commands = prepareCommand()

    ctx.setCommands(commands.asJava)
    ctx.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)

    // If external shuffle service is enabled, register with the Yarn shuffle service already
    // started on the NodeManager and, if authentication is enabled, provide it with our secret
    // key for fetching shuffle files later
    if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
    
      val secretString = securityMgr.getSecretKey()
      val secretBytes =
        if (secretString != null) {
    
          // This conversion must match how the YarnShuffleService decodes our secret
          JavaUtils.stringToBytes(secretString)
        } else {
    
          // Authentication is not enabled, so just provide dummy metadata
          ByteBuffer.allocate(0)
        }
      ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
    }

    // Send the start request to the ContainerManager
    try {
    
      //NM启动容器
      nmClient.startContainer(container.get, ctx)
    } catch {
    
      case ex: Exception =>
        throw new SparkException(s"Exception while starting container ${
      container.get.getId}" +
          s" on host $hostname", ex)
    }
  }
}

4.1.1.prepareEnvironment

​ 将系统属性spark开头的参数、sparkConf中executorExecution environment related parameters、logThe relevant parameters are encapsulated into hashMapin the build cache;

private[yarn] class ExecutorRunnable(
    container: Option[Container],
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    masterAddress: String,
    executorId: String,
    hostname: String,
    executorMemory: Int,
    executorCores: Int,
    appId: String,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource]) extends Logging {
    

  private def prepareEnvironment(): HashMap[String, String] = {
    
    val env = new HashMap[String, String]()
    Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))

    // http模式获取
    val yarnHttpPolicy = conf.get(
      YarnConfiguration.YARN_HTTP_POLICY_KEY,
      YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
    )
    val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"

    //in the system propertiessparkThe parameters at the beginning are encapsulated intoenv缓存中;
    System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
      .foreach {
     case (k, v) => env(k) = v }

    //将sparkConf中executorThe execution environment variable is encapsulated intoenv缓存中
    sparkConf.getExecutorEnv.foreach {
     case (key, value) =>
      if (key == Environment.CLASSPATH.name()) {
    
        // If the key of env variable is CLASSPATH, we assume it is a path and append it.
        // This is kept for backward compatibility and consistency with hadoop
        YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
      } else {
    
        // For other env variables, simply overwrite the value.
        env(key) = value
      }
    }

    // 设置logrelated parameters toenv缓存中
    container.foreach {
     c =>
      sys.env.get("SPARK_USER").foreach {
     user =>
        val containerId = ConverterUtils.toString(c.getId)
        val address = c.getNodeHttpAddress
        val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"

        env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
        env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
      }
    }

    env
  }
}

4.1.2.prepareCommand封装命令

​ The current method is mainly to start the containerexecutorcommand to assemble;

​ executor内存参数、External class library path、Container temporary directory、日志目录、backend、driver相关信息、executor相关信息等;

​ It can be seen from the parameter settings,启动executor后,执行task的backend为org.apache.spark.executor.CoarseGrainedExecutorBackend,由/bin/javaway to start onebackend进场;

private[yarn] class ExecutorRunnable(
    container: Option[Container],
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    masterAddress: String,
    executorId: String,
    hostname: String,
    executorMemory: Int,
    executorCores: Int,
    appId: String,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource]) extends Logging {
    

  private def prepareCommand(): List[String] = {
    
    // JVM命令缓存
    val javaOpts = ListBuffer[String]()

    // 将executorThe memory parameter is added to the command cache
    val executorMemoryString = executorMemory + "m"
    javaOpts += "-Xmx" + executorMemoryString

    // 解析sparkConf中spark.executor.extraLibraryPath(External class library path)参数,Add to the command cache
    sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach {
     opts =>
      val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId)
      javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell)
    }

    // Machining command prefix
    val prefixEnv = sparkConf.get(EXECUTOR_LIBRARY_PATH).map {
     libPath =>
      Client.createLibraryPathPrefix(libPath, sparkConf)
    }

    //Add container temporary directory
    javaOpts += "-Djava.io.tmpdir=" +
      new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)

    // Certain configs need to be passed here because they are needed before the Executor
    // registers with the Scheduler and transfers the spark configs. Since the Executor backend
    // uses RPC to connect to the scheduler, the RPC settings are needed as well as the
    // authentication settings.
    sparkConf.getAll
      .filter {
     case (k, v) => SparkConf.isExecutorStartupConf(k) }
      .foreach {
     case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }

    

    //Set the log directory to the command
    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)

    //Process the user classpath
    val userClassPath = Client.getUserClasspath(sparkConf).flatMap {
     uri =>
      val absPath =
        if (new File(uri.getPath()).isAbsolute()) {
    
          Client.getClusterPath(sparkConf, uri.getPath())
        } else {
    
          Client.buildPath(Environment.PWD.$(), uri.getPath())
        }
      Seq("--user-class-path", "file:" + absPath)
    }.toSeq

    YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
      
    //组装命令
    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++
      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
        "--driver-url", masterAddress,
        "--executor-id", executorId,
        "--hostname", hostname,
        "--cores", executorCores.toString,
        "--app-id", appId) ++
      userClassPath ++
      Seq(
        s"1>${
      ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
        s"2>${
      ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

    // TODO: it would be nicer to just make sure there are no null commands here
    commands.map(s => if (s == null) "null" else s).toList
  }
  
}

4.1.3.nmClient.startContainer启动容器

public class NMClientImpl extends NMClient {
    
    protected ConcurrentMap<ContainerId, NMClientImpl.StartedContainer> startedContainers = new ConcurrentHashMap();
    
	public Map<String, ByteBuffer> startContainer(Container container, ContainerLaunchContext containerLaunchContext) throws YarnException, IOException {
    
        //启动容器
        NMClientImpl.StartedContainer startingContainer = new NMClientImpl.StartedContainer(container.getId(), container.getNodeId());
        
        synchronized(startingContainer) {
    
            //Add the started container to the list of start complete containers
            this.addStartingContainer(startingContainer);
            ContainerManagementProtocolProxyData proxy = null;

            Map allServiceResponse;
            try {
    
                //Instantiate the container manager agent,There are agents to start containers
                proxy = this.cmProxy.getProxy(container.getNodeId().toString(), container.getId());
                //封装请求
                StartContainerRequest scRequest = StartContainerRequest.newInstance(containerLaunchContext, container.getContainerToken());
                List<StartContainerRequest> list = new ArrayList();
                list.add(scRequest);
                StartContainersRequest allRequests = StartContainersRequest.newInstance(list);
                //启动容器,执行executor启动命令
                StartContainersResponse response = proxy.getContainerManagementProtocol().startContainers(allRequests);
                if (response.getFailedRequests() != null && response.getFailedRequests().containsKey(container.getId())) {
    
                    Throwable t = ((SerializedException)response.getFailedRequests().get(container.getId())).deSerialize();
                    this.parseAndThrowException(t);
                }

                allServiceResponse = response.getAllServicesMetaData();
                startingContainer.state = ContainerState.RUNNING;
            } catch (YarnException var19) {
    
                startingContainer.state = ContainerState.COMPLETE;
                this.startedContainers.remove(startingContainer.containerId);
                throw var19;
            } catch (IOException var20) {
    
                startingContainer.state = ContainerState.COMPLETE;
                this.startedContainers.remove(startingContainer.containerId);
                throw var20;
            } catch (Throwable var21) {
    
                startingContainer.state = ContainerState.COMPLETE;
                this.startedContainers.remove(startingContainer.containerId);
                throw RPCUtil.getRemoteException(var21);
            } finally {
    
                if (proxy != null) {
    
                    this.cmProxy.mayBeCloseProxy(proxy);
                }

            }

            return allServiceResponse;
        }
    }
}

4.1.3.1.StartedContainer类

​ 该类是NMClientImpl类的内部类;

​ Encapsulates the container information that has been started;

protected static class StartedContainer {
    
        private ContainerId containerId;
        private NodeId nodeId;
        private ContainerState state;

        public StartedContainer(ContainerId containerId, NodeId nodeId) {
    
            this.containerId = containerId;
            this.nodeId = nodeId;
            this.state = ContainerState.NEW;
        }

        public ContainerId getContainerId() {
    
            return this.containerId;
        }

        public NodeId getNodeId() {
    
            return this.nodeId;
        }
    }

5.总结

​ 在AM中先RM申请资源,RM返回资源给AM后,由AMCreate a resource allocator,Started by the resource allocator based on the allocated resourcesexecutor;

​ executor启动过程中:

​ 将系统属性spark开头的参数、sparkConf中executorExecution environment related parameters、logThe relevant parameters are encapsulated into hashMapin the build cache;

​ 根据executor内存参数、External class library path、Container temporary directory、日志目录、backend、driver相关信息、executorrelated information, etcexecutor启动命令,Which is set to /bin/java方式启动一个org.apache.spark.executor.CoarseGrainedExecutorBackend进程,Subsequent execution by this processtask任务;

​ 最后由NM启动executor;

6.参考资料

spark源码-任务提交流程之ApplicationMaster

Spark源码——Spark on YARN Executor执行Task的过程

原网站

版权声明
本文为[zdaiqing]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/217/202208050514371105.html