当前位置:网站首页>Review the executor from the perspective of application submission
Review the executor from the perspective of application submission
2022-06-22 16:57:00 【ZH519080】
from Application From the perspective of submission Executor, Decrypt Executor When was it started and Executor How to give the result to Application.
Executor When to start
SparkContext After starting , Direct instantiation createTaskScheduler Method ,createTaskScheduler After completion , call TaskScheduler Of start Method , What's actually called is TaskSchedulerImpl Of start Method , stay TaskSchedulerImpl Of start Method implementation SparkDeploySchedulerBackend(spark2.0 Later renamed StandaleSchedulerBackend) Of start Method .SparkDeploySchedulerBackend Of start method commend Package registered to Master,Master Turn around Worker Start specific Executor,command The instructions have been encapsulated ,Executor Specifically, start the process entry class CoarseGrainedExecutorBackend. And then call new() Function to create a AppClient,AppClient One of them is called ClientEndpoint The inner class of , Creating ClientEndpoint When it comes to Command To specify the specific..., which is launched for the current application Executor The name of the entry class for is CoarseGrainedExecutorBackend.ClientEndpoint Inherited from ThreadSafeRpcEndpoint, It is through RPC Mechanism completion and Master Communication for . stay ClientEndpoint Of start In the method , Will pass registerWithMaster Method direction Master send out RegisterApplication request ,Master After receiving the request message , First, through registerApplication Method to complete information registration , Then call schedule Method , stay Worker Start the Executor( Detailed view Master-Driver and Worker-Executor Anatomy chapter ).
override def receive: PartialFunction[Any, Unit] = { ......
case RegisterApplication(description, driver) => {
if (state == RecoveryState.STANDBY) {
} else {val app = createApplication(description, driver)
registerApplication(app)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule()}
}
......}Master Match received RegisterApplication request , First judge Master Is it STANDBY( spare ), If not, it is ALIVE state , call createApplicaton Method creation applicationInfo, hold applicationInfo Information through registerApplication Method to register , adopt persistenceEngine.addApplication Method to persist , Upon completion driver.send Method direction AppClient Return the information of successful registration .
object ApplicationDescription and RpcEndpointRef Incoming method createApplication in ,createApplication The object returned is ApplicationInfo,Master Class createApplication Method source code :
private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
val appId = newApplicationId(date)
new ApplicationInfo(now, appId, desc, date, driver, defaultCores)}After the ApplicationInfo Object to registerApplication Method , call registerApplication Method to complete application Registration of ,registerApplication Method source code :
private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address //Driver Address , be used for Master and Driver signal communication
if (addressToApp.contains(appAddress)) {
return // If any Driver Address , be Driver Already registered , direct return
}
applicationMetricsSystem.registerSource(app.appSource) // Register with the measurement system
apps += app //apps It's a HashSet, Ensure data is not repeated
idToApp(app.id) = app //idToApp It's a HashMap, preservation id and app Correspondence of
endpointToApp(app.driver) = app //endpoint yes HashMap, preservation Driver and app Correspondence of
addressToApp(appAddress) = app
waitingApps += app
}After registration , call shedule Method , This method has two functions :1、 complete Driver Dispatch will waitingDrivers Array Driver Send to meet Worker Up operation ;2、Worker On the node is application start-up Executor. Every new Driver Registration of 、application The registration or resource change of will call schedule Method .Schedule Method is used for the currently scheduled application Schedule available resources , When the conditions are met Worker Start on the node Executor, call startExecutorsOnWorkers Method to complete the operation ( Detailed see Master-Driver and Worker-Executor Anatomy chapter ).
start-up Executor Methods startExecutorsOnWorkers Is to call scheduleExecutorsOnWorkers Method , There are two ways to start this method Executor Strategy : The strategy of sharing in turn and the strategy of occupying in turn ,Master Of scheduleExecutorsOnWorkers Method source code :
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {......}scheduleExecutorOnWorkers Method is application After allocating resources in a logical sense , Not really in Worker The node is application Allocate resources , When calling allocateWorkerResourceToExecutors Method will be in Worker Actually allocate resources on the node .
private def allocateWorkerResourceToExecutors( app: ApplicationInfo,
......
val exec = app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING}launchExecutor Method source code :
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))}Worker received LaunchExecutor(worker.endpoint.send()) The message will be processed accordingly . matching LaunchDriver Successfully built DriverRunner object , call DriverRunner Of start Method . stay DriverRunner Of start Call in method fetchAndRunExecutor Method , In this method CommandUtils.buildProcessBuilder(appDesc.command...) The incoming entry class is “org.apache.spark.executor.CoarseGrainedExecutorBackend”, When Worker Start in node ExecutorRunner when ,ExecutorRunner Will start CoarseGrainedExecutorBackend process , stay CoarseGrainedExecutorBackend Of onStart In the method , towards Driver issue RegisterExecutor Registration request .Driver The client receives a registration request , Will register Executor Request .CoarseGrainedSchedulerBackend Of receiveAndReply Method source code :
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
if (executorDataMap.contains(executorId)) { ......}Driver towards CoarseGrainedExecutorBackend send out RegisteredExecutor news ,CoarseGrainedExecutorBackend received RrgisteredExecutor A new one will be created after the message Executor actuator , And for that Executor Act as a messenger with Driver signal communication .CoarseGrainedExecutorBackend received RegisteredExecutor Implementation method of message receive Source code :
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor(hostname) =>
logInfo("Successfully registered with driver")
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
......}from CoarseGrainedExecutorBackend Of receive We know from the method ,CoarseGrainedExecutorBackend received RegisteredExecutor After the news , Create a org.apache.spark.executor.Executor object , thus Executor Creation completed .
边栏推荐
猜你喜欢

The world's "first" IEEE privacy computing "connectivity" international standard led by insight technology was officially launched

洞见科技牵头的全球「首个」IEEE隐私计算「互联互通」国际标准正式启动

jMeter使用案例

jsp学习之(一)---------jsp概述

jsp學習之(二)---------jsp脚本元素和指令

Vhedt business development framework

【C语言】深度剖析整型和浮点型在内存中的存储

视频会议时听不到声音该如何处理?

Summary of safari compatibility issues

STM32 ADC acquisition via DMA (HAL Library)
随机推荐
scala的相等性
NiO programming service
NiO service multithreaded version
Oracle database and table
VHEDT业务发展框架
jsp学习之开发环境的配置
vs2017 在调试状态不显示QString值的解决方法
Test for API
Bidirectional data binding V-model and v-decorator
Test for API
迭代器与生成器
为什么要买增额终身寿险?增额终身寿险安全可靠吗?
同花顺是什么?在线开户安全么?
spark关于数据倾斜问题
Summary of Changan chain usage skills
STM32 ADC acquisition via DMA (HAL Library)
Interview knowledge points
JSP learning (2) -- JSP script elements and instructions
变量
variable