当前位置:网站首页>Spark source code - task submission process - 4-container to start executor
Spark source code - task submission process - 4-container to start executor
2022-08-05 06:11:00 【zdaiqing】
container中启动executor
1.概述
在spark源码-任务提交流程之ApplicationMaster分析到,在AM中,做了3件事情:
1、创建driver线程并启动driver线程,Execute user class definitionmain方法;
2、向RM注册AM;
3、AM向RM申请资源,根据资源(containers)Matching rules select available resources,and allocatedcontainer中启动executor;
下面分析在container中启动executor的情况;
2.入口
此段代码在AM向RMCalled when requesting a resource;
AM向RM申请资源,RM返回资源列表containers,This piece of code is called by the allocator,Implement resource filtering andexecutor启动;详情见spark源码-任务提交流程之ApplicationMaster:AM向RM申请资源
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock)
extends Logging {
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
//List of available containers
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
//......Irrelevant code is omitted
//在分配的containers中启动executors
runAllocatedContainers(containersToUse)
//......Irrelevant code is omitted
}
}
3.runAllocatedContainers
The current method will be allocatedcontainers中启动executors;
Poll for available containers,Each container corresponds to start a thread,在线程中启动executor;
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock)
extends Logging {
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
//Poll for available containers
for (container <- containersToUse) {
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
assert(container.getResource.getMemory >= resource.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId")
def updateInternalState(): Unit = synchronized {
runningExecutors.add(executorId)
numExecutorsStarting.decrementAndGet()
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
}
//Determine if a new start is possibleexecutor:正在运行的executor数 < 目标executor数
if (runningExecutors.size() < targetNumExecutors) {
//Recording startedexecutor数
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
//A new thread is enabled in the thread pool,and execute the new thread'srun方法
launcherPool.execute(new Runnable {
override def run(): Unit = {
try {
//实例化ExecutorRunnableobject and execute the object'srun方法,在run方法中启动executor
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
//Update built-in state
updateInternalState()
} catch {
case e: Throwable =>
numExecutorsStarting.decrementAndGet()
if (NonFatal(e)) {
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately
// to avoid unnecessary resource occupation.
amClient.releaseAssignedContainer(containerId)
} else {
throw e
}
}
}
})
} else {
// For test only
updateInternalState()
}
} else {
logInfo(("Skip launching executorRunnable as running executors count: %d " +
"reached target executors count: %d.").format(
runningExecutors.size, targetNumExecutors))
}
}
}
}
4.ExecutorRunnable.run
private[yarn] class ExecutorRunnable(
container: Option[Container],
conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]) extends Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
var nmClient: NMClient = _
def run(): Unit = {
logDebug("Starting Executor Container")
//NameNode创建、初始化、Start a train
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
//启动容器
startContainer()
}
}
4.1.startContainer启动容器
在当前方法中,主要做了如下3thing please:
1、Prepare the container to startexecutor的环境;
2、Start in a wrapperexecutor的命令;
3、NameNode中启动Executor;
private[yarn] class ExecutorRunnable(
container: Option[Container],
conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]) extends Logging {
def startContainer(): java.util.Map[String, ByteBuffer] = {
//Container startup context
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
//准备环境
val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
//封装参数
val commands = prepareCommand()
ctx.setCommands(commands.asJava)
ctx.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
// If external shuffle service is enabled, register with the Yarn shuffle service already
// started on the NodeManager and, if authentication is enabled, provide it with our secret
// key for fetching shuffle files later
if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
val secretString = securityMgr.getSecretKey()
val secretBytes =
if (secretString != null) {
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
}
// Send the start request to the ContainerManager
try {
//NM启动容器
nmClient.startContainer(container.get, ctx)
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${
container.get.getId}" +
s" on host $hostname", ex)
}
}
}
4.1.1.prepareEnvironment
将系统属性spark开头的参数、sparkConf中executorExecution environment related parameters、logThe relevant parameters are encapsulated into hashMapin the build cache;
private[yarn] class ExecutorRunnable(
container: Option[Container],
conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]) extends Logging {
private def prepareEnvironment(): HashMap[String, String] = {
val env = new HashMap[String, String]()
Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
// http模式获取
val yarnHttpPolicy = conf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
)
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
//in the system propertiessparkThe parameters at the beginning are encapsulated intoenv缓存中;
System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
.foreach {
case (k, v) => env(k) = v }
//将sparkConf中executorThe execution environment variable is encapsulated intoenv缓存中
sparkConf.getExecutorEnv.foreach {
case (key, value) =>
if (key == Environment.CLASSPATH.name()) {
// If the key of env variable is CLASSPATH, we assume it is a path and append it.
// This is kept for backward compatibility and consistency with hadoop
YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
} else {
// For other env variables, simply overwrite the value.
env(key) = value
}
}
// 设置logrelated parameters toenv缓存中
container.foreach {
c =>
sys.env.get("SPARK_USER").foreach {
user =>
val containerId = ConverterUtils.toString(c.getId)
val address = c.getNodeHttpAddress
val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
}
}
env
}
}
4.1.2.prepareCommand封装命令
The current method is mainly to start the containerexecutorcommand to assemble;
executor内存参数、External class library path、Container temporary directory、日志目录、backend、driver相关信息、executor相关信息等;
It can be seen from the parameter settings,启动executor后,执行task的backend为org.apache.spark.executor.CoarseGrainedExecutorBackend,由/bin/javaway to start onebackend进场;
private[yarn] class ExecutorRunnable(
container: Option[Container],
conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]) extends Logging {
private def prepareCommand(): List[String] = {
// JVM命令缓存
val javaOpts = ListBuffer[String]()
// 将executorThe memory parameter is added to the command cache
val executorMemoryString = executorMemory + "m"
javaOpts += "-Xmx" + executorMemoryString
// 解析sparkConf中spark.executor.extraLibraryPath(External class library path)参数,Add to the command cache
sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach {
opts =>
val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId)
javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell)
}
// Machining command prefix
val prefixEnv = sparkConf.get(EXECUTOR_LIBRARY_PATH).map {
libPath =>
Client.createLibraryPathPrefix(libPath, sparkConf)
}
//Add container temporary directory
javaOpts += "-Djava.io.tmpdir=" +
new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
// uses RPC to connect to the scheduler, the RPC settings are needed as well as the
// authentication settings.
sparkConf.getAll
.filter {
case (k, v) => SparkConf.isExecutorStartupConf(k) }
.foreach {
case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
//Set the log directory to the command
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
//Process the user classpath
val userClassPath = Client.getUserClasspath(sparkConf).flatMap {
uri =>
val absPath =
if (new File(uri.getPath()).isAbsolute()) {
Client.getClusterPath(sparkConf, uri.getPath())
} else {
Client.buildPath(Environment.PWD.$(), uri.getPath())
}
Seq("--user-class-path", "file:" + absPath)
}.toSeq
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
//组装命令
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
userClassPath ++
Seq(
s"1>${
ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${
ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).toList
}
}
4.1.3.nmClient.startContainer启动容器
public class NMClientImpl extends NMClient {
protected ConcurrentMap<ContainerId, NMClientImpl.StartedContainer> startedContainers = new ConcurrentHashMap();
public Map<String, ByteBuffer> startContainer(Container container, ContainerLaunchContext containerLaunchContext) throws YarnException, IOException {
//启动容器
NMClientImpl.StartedContainer startingContainer = new NMClientImpl.StartedContainer(container.getId(), container.getNodeId());
synchronized(startingContainer) {
//Add the started container to the list of start complete containers
this.addStartingContainer(startingContainer);
ContainerManagementProtocolProxyData proxy = null;
Map allServiceResponse;
try {
//Instantiate the container manager agent,There are agents to start containers
proxy = this.cmProxy.getProxy(container.getNodeId().toString(), container.getId());
//封装请求
StartContainerRequest scRequest = StartContainerRequest.newInstance(containerLaunchContext, container.getContainerToken());
List<StartContainerRequest> list = new ArrayList();
list.add(scRequest);
StartContainersRequest allRequests = StartContainersRequest.newInstance(list);
//启动容器,执行executor启动命令
StartContainersResponse response = proxy.getContainerManagementProtocol().startContainers(allRequests);
if (response.getFailedRequests() != null && response.getFailedRequests().containsKey(container.getId())) {
Throwable t = ((SerializedException)response.getFailedRequests().get(container.getId())).deSerialize();
this.parseAndThrowException(t);
}
allServiceResponse = response.getAllServicesMetaData();
startingContainer.state = ContainerState.RUNNING;
} catch (YarnException var19) {
startingContainer.state = ContainerState.COMPLETE;
this.startedContainers.remove(startingContainer.containerId);
throw var19;
} catch (IOException var20) {
startingContainer.state = ContainerState.COMPLETE;
this.startedContainers.remove(startingContainer.containerId);
throw var20;
} catch (Throwable var21) {
startingContainer.state = ContainerState.COMPLETE;
this.startedContainers.remove(startingContainer.containerId);
throw RPCUtil.getRemoteException(var21);
} finally {
if (proxy != null) {
this.cmProxy.mayBeCloseProxy(proxy);
}
}
return allServiceResponse;
}
}
}
4.1.3.1.StartedContainer类
该类是NMClientImpl类的内部类;
Encapsulates the container information that has been started;
protected static class StartedContainer {
private ContainerId containerId;
private NodeId nodeId;
private ContainerState state;
public StartedContainer(ContainerId containerId, NodeId nodeId) {
this.containerId = containerId;
this.nodeId = nodeId;
this.state = ContainerState.NEW;
}
public ContainerId getContainerId() {
return this.containerId;
}
public NodeId getNodeId() {
return this.nodeId;
}
}
5.总结
在AM中先RM申请资源,RM返回资源给AM后,由AMCreate a resource allocator,Started by the resource allocator based on the allocated resourcesexecutor;
executor启动过程中:
将系统属性spark开头的参数、sparkConf中executorExecution environment related parameters、logThe relevant parameters are encapsulated into hashMapin the build cache;
根据executor内存参数、External class library path、Container temporary directory、日志目录、backend、driver相关信息、executorrelated information, etcexecutor启动命令,Which is set to /bin/java方式启动一个org.apache.spark.executor.CoarseGrainedExecutorBackend进程,Subsequent execution by this processtask任务;
最后由NM启动executor;
6.参考资料
边栏推荐
猜你喜欢
随机推荐
OpenCV3.0 兼容VS2010与VS2013的问题
[Pytorch study notes] 8. How to use WeightedRandomSampler (weight sampler) when the training category is unbalanced data
Wireshark抓包及常用过滤方法
洞察互联网大趋势,读完这篇文章你就彻底了解中文域名
入门文档03 区分开发与生产环境(生产环境才执行‘热更新’)
静态路由
dsf5.0新建页面访问时重定向到首页的问题
Contextual non-local alignment of full-scale representations
2020,Laya最新中高级面试灵魂32问,你都知道吗?
Getting Started Document 01 series in order
每日一题-合并K个升序链表-0722
[Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
【Day8】Knowledge about disk and disk partition
[Day8] Commands involved in using LVM to expand
spark算子-coalesce算子
Getting Started Document 07 Staged Output
Unity huatuo 革命性热更系列1.3 huatuo示例项目源码分析与启发
成功的独立开发者应对失败&冒名顶替综
unity实现第一人称漫游(保姆级教程)
spark算子-map vs mapPartitions算子