当前位置:网站首页>spark source code - task submission process - 3-ApplicationMaster
spark source code - task submission process - 3-ApplicationMaster
2022-08-05 06:12:00 【zdaiqing】
ApplicationMaster
1.概述
在【spark源码-任务提交流程之YarnClusterApplication】中分析到,Task execution process in the implementationYarnClusterApplication过程中,会将amContext parameter for packaging,Then application submitted toRM;It will beamClassParameters for the Settings;
针对yarn-cluster模式,amClass = bin/java org.apache.spark.deploy.yarn.ApplicationMaster;The application submitted toRM后,RM会选择一台NM机器启动AM;
下面将针对AMThe start of the analysis;
2.main 主入口
全路径:org.apache.spark.deploy.yarn.ApplicationMaster;
在ApplicationMaster启动后,做了如下3件事:
1、解析参数;
2、实例化AM
3、执行run
object ApplicationMaster extends Logging {
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
//解析参数
val amArgs = new ApplicationMasterArguments(args)
//实例化AM
master = new ApplicationMaster(amArgs)
//执行run
System.exit(master.run())
}
}
2.1.解析AM参数进行封装
According to the parameters define the application entry point、jars、参数、Properties file information such as;
同时设置executor默认数量为2;
class ApplicationMasterArguments(val args: Array[String]) {
//应用程序 jar And any options included in the jar 由参数--jar定义
var userJar: String = null
//应用程序的入口点
var userClass: String = null
var primaryPyFile: String = null
var primaryRFile: String = null
//应用程序参数
var userArgs: Seq[String] = Nil
//Additional attributes of the file
var propertiesFile: String = null
parseArgs(args.toList)
private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer = new ArrayBuffer[String]()
var args = inputArgs
while (!args.isEmpty) {
// --num-workers, --worker-memory, and --worker-cores are deprecated since 1.0,
// the properties with executor in their names are preferred.
args match {
case ("--jar") :: value :: tail =>
userJar = value
args = tail
case ("--class") :: value :: tail =>
userClass = value
args = tail
case ("--primary-py-file") :: value :: tail =>
primaryPyFile = value
args = tail
case ("--primary-r-file") :: value :: tail =>
primaryRFile = value
args = tail
case ("--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
case ("--properties-file") :: value :: tail =>
propertiesFile = value
args = tail
case _ =>
printUsageAndExit(1, args)
}
}
if (primaryPyFile != null && primaryRFile != null) {
// scalastyle:off println
System.err.println("Cannot have primary-py-file and primary-r-file at the same time")
// scalastyle:on println
System.exit(-1)
}
userArgs = userArgsBuffer.toList
}
def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
//..............
}
}
object ApplicationMasterArguments {
val DEFAULT_NUMBER_EXECUTORS = 2
}
2.2.实例化AM
在实例化AM过程中,The following events:
实例化sparkConfAnd set the parameters in the properties file tosparkConf中;
将sparkConfThe parameters set in the system properties;
实例化securityMgr;
实例化RM Client;
Localization of load client Settings file list
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val isClusterMode = args.userClass != null
//实例化spark config
private val sparkConf = new SparkConf()
if (args.propertiesFile != null) {
//The parameters in the configuration properties file cache tosparkConf中:通过hashMap实现缓存
Utils.getPropertiesFromFile(args.propertiesFile).foreach {
case (k, v) =>
sparkConf.set(k, v)
}
}
//根据sparkConf实例化securityMgr
private val securityMgr = new SecurityManager(sparkConf实例化)
private var metricsSystem: Option[MetricsSystem] = None
// 将sparkConfThe parameter is set to the system properties
sparkConf.getAll.foreach {
case (k, v) =>
sys.props(k) = v
}
//根据sparkConf构建yarnConf
private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
//实例化类加载器
private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map {
entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
if (isClusterMode) {
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
}
//The token updater
private val credentialRenewer: Option[AMCredentialRenewer] = sparkConf.get(KEYTAB).map {
_ =>
new AMCredentialRenewer(sparkConf, yarnConf)
}
//使用UGIThe user as the currentApplicationMasterThe operation of the use
private val ugi = credentialRenewer match {
case Some(cr) =>
// Set the context class loader so that the token renewer has access to jars distributed
// by the user.
val currentLoader = Thread.currentThread().getContextClassLoader()
Thread.currentThread().setContextClassLoader(userClassLoader)
try {
cr.start()
} finally {
Thread.currentThread().setContextClassLoader(currentLoader)
}
case _ =>
SparkHadoopUtil.get.createSparkUser()
}
//实例化RM
private val client = doAsUser {
new YarnRMClient() }
//默认为executor数量的两倍(如果启用动态分配,则为最大executor数量的两倍),最小为3
private val maxNumExecutorFailures = {
val effectiveNumExecutors =
if (Utils.isDynamicAllocationEnabled(sparkConf)) {
sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
} else {
sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
}
// By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need
// avoid the integer overflow here.
val defaultMaxNumExecutorFailures = math.max(3,
if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else (2 * effectiveNumExecutors))
sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
}
@volatile private var exitCode = 0
@volatile private var unregistered = false
@volatile private var finished = false
@volatile private var finalStatus = getDefaultFinalStatus
@volatile private var finalMsg: String = ""
@volatile private var userClassThread: Thread = _
@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _
// A flag to check whether user has initialized spark context
@volatile private var registered = false
// Lock for controlling the allocator (heartbeat) thread.
private val allocatorLock = new Object()
// 心跳间隔
private val heartbeatInterval = {
// Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
math.max(0, math.min(expiryInterval / 2, sparkConf.get(RM_HEARTBEAT_INTERVAL)))
}
// Allocator polling before initial wait interval,To allow the execution procedure is requested to rise faster
private val initialAllocationInterval = math.min(heartbeatInterval,
sparkConf.get(INITIAL_HEARTBEAT_INTERVAL))
// Allocator polling before the next wait interval
private var nextAllocationInterval = initialAllocationInterval
private var rpcEnv: RpcEnv = null
// 在集群模式下,用于告诉AM用户的SparkContext已经初始化.
private val sparkContextPromise = Promise[SparkContext]()
// Localization of load client Settings file list.Actuator is used when it's start,And loading here,So that these configurations in cluster mode won't pollutionWeb UIThe environment of the page
private val localResources = doAsUser {
logInfo("Preparing Local resources")
val resources = HashMap[String, LocalResource]()
def setupDistributedCache(
file: String,
rtype: LocalResourceType,
timestamp: String,
size: String,
vis: String): Unit = {
val uri = new URI(file)
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(rtype)
amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
amJarRsrc.setTimestamp(timestamp.toLong)
amJarRsrc.setSize(size.toLong)
val fileName = Option(uri.getFragment()).getOrElse(new Path(uri).getName())
resources(fileName) = amJarRsrc
}
val distFiles = sparkConf.get(CACHED_FILES)
val fileSizes = sparkConf.get(CACHED_FILES_SIZES)
val timeStamps = sparkConf.get(CACHED_FILES_TIMESTAMPS)
val visibilities = sparkConf.get(CACHED_FILES_VISIBILITIES)
val resTypes = sparkConf.get(CACHED_FILES_TYPES)
for (i <- 0 to distFiles.size - 1) {
val resType = LocalResourceType.valueOf(resTypes(i))
setupDistributedCache(distFiles(i), resType, timeStamps(i).toString, fileSizes(i).toString,
visibilities(i))
}
// Distribute the conf archive to executors.
sparkConf.get(CACHED_CONF_ARCHIVE).foreach {
path =>
val uri = new URI(path)
val fs = FileSystem.get(uri, yarnConf)
val status = fs.getFileStatus(new Path(uri))
// SPARK-16080: Make sure to use the correct name for the destination when distributing the
// conf archive to executors.
val destUri = new URI(uri.getScheme(), uri.getRawSchemeSpecificPart(),
Client.LOCALIZED_CONF_DIR)
setupDistributedCache(destUri.toString(), LocalResourceType.ARCHIVE,
status.getModificationTime().toString, status.getLen.toString,
LocalResourceVisibility.PRIVATE.name())
}
// Clean up the configuration so it doesn't show up in the Web UI (since it's really noisy).
CACHE_CONFIGS.foreach {
e =>
sparkConf.remove(e)
sys.props.remove(e.key)
}
resources.toMap
}
}
2.3.执行AM的run方法
In view of the cluster pattern,设置系统属性、构建spark调用上下文、调用runDriver方法
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
final def run(): Int = {
doAsUser {
//执行runImpl实现方法
runImpl()
}
exitCode
}
private def runImpl(): Unit = {
try {
val appAttemptId = client.getAttemptId()
var attemptID: Option[String] = None
//Set up the cluster model of system properties
if (isClusterMode) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
// Set the master and deploy mode property to match the requested mode.
System.setProperty("spark.master", "yarn")
System.setProperty("spark.submit.deployMode", "cluster")
// Set this internal configuration if it is running on cluster mode, this
// configuration will be checked in SparkContext to avoid misuse of yarn cluster mode.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
attemptID = Option(appAttemptId.getAttemptId.toString)
}
//在HDFS和Yarn上建立Spark调用上下文.Context will be made by the incoming parameters structure
new CallerContext(
"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
logInfo("ApplicationAttemptId: " + appAttemptId)
// This shutdown hook should run *after* the SparkContext is shut down.
val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
ShutdownHookManager.addShutdownHook(priority) {
() =>
val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (!finished) {
// The default state of ApplicationMaster is failed if it is invoked by shut down hook.
// This behavior is different compared to 1.x version.
// If user application is exited ahead of time by calling System.exit(N), here mark
// this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
// System.exit(0) to terminate the application.
finish(finalStatus,
ApplicationMaster.EXIT_EARLY,
"Shutdown hook called before final status was reported.")
}
if (!unregistered) {
// we only want to unregister if we don't want the RM to retry
if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
unregister(finalStatus, finalMsg)
cleanupStagingDir()
}
}
}
if (isClusterMode) {
//yarn-cluster模式,执行runDriver
runDriver()
} else {
runExecutorLauncher()
}
} catch {
case e: Exception =>
// catch everything else if not specifically handled
logError("Uncaught exception: ", e)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + StringUtils.stringifyException(e))
} finally {
try {
metricsSystem.foreach {
ms =>
ms.report()
ms.stop()
}
} catch {
case e: Exception =>
logWarning("Exception during stopping of the metric system: ", e)
}
}
}
private def doAsUser[T](fn: => T): T = {
ugi.doAs(new PrivilegedExceptionAction[T]() {
override def run: T = fn
})
}
}
2.3.1.runDriver
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
private def runDriver(): Unit = {
addAmIpFilter(None)
//启动用户程序,返回一个线程,即启动driver线程
userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
logInfo("Waiting for spark context initialization...")
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
try {
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
val host = userConf.get("spark.driver.host")
val port = userConf.get("spark.driver.port").toInt
//注册AM:AM向RM注册
registerAM(host, port, userConf, sc.ui.map(_.webUrl))
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
//申请资源
createAllocator(driverRef, userConf)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
resumeDriver()
userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
logError(
s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
"Please check earlier log output for errors. Failing the application.")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} finally {
resumeDriver()
}
}
}
2.3.1.1.startUserApplication 启动一个driver线程
driverIs a user class code execution thread name;
通过spark-submit提交参数–classSpecify the class path to determine user classes;
Through reflection to get the user classmain方法;
Create a thread to execute themain方法;
driver线程是amThe child thread process;
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
//解析用户参数
var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
// When running pyspark, the app is run using PythonRunner. The second argument is the list
// of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
userArgs = Seq(args.primaryPyFile, "") ++ userArgs
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
// TODO(davies): add R dependencies here
}
//By reflecting access to user classesmain方法
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
//创建新线程
val userThread = new Thread {
override def run() {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${
args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
//通过反射实现main方法调用
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
} catch {
case e: InvocationTargetException =>
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class
case SparkUserAppException(exitCode) =>
val msg = s"User application exited with status $exitCode"
logError(msg)
finish(FinalApplicationStatus.FAILED, exitCode, msg)
case cause: Throwable =>
logError("User class threw exception: " + cause, cause)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
"User class threw exception: " + StringUtils.stringifyException(cause))
}
sparkContextPromise.tryFailure(e.getCause())
} finally {
// Notify the thread waiting for the SparkContext, in case the application did not
// instantiate one. This will do nothing when the user code instantiates a SparkContext
// (with the correct master), or when the user code throws an exception (due to the
// tryFailure above).
sparkContextPromise.trySuccess(null)
}
}
}
userThread.setContextClassLoader(userClassLoader)
//设置线程名为driver:driver线程为AMUnder the process of a child thread,This thread to perform user classesmain方法
userThread.setName("Driver")
//启动线程:执行线程run方法,And calls to perform user definedmain方法
userThread.start()
//返回driver线程
userThread
}
}
2.3.1.2.向RM注册AM
通过RPC方式向RM注册AM;
通过将AM的host、port、Provide trackingweb url注册到RM并得到RM响应,完成注册流程;
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
private def registerAM(
host: String, //dirver的ip
port: Int, //driver线程的port
_sparkConf: SparkConf,
uiAddress: Option[String]): Unit = {
val appId = client.getAttemptId().getApplicationId().toString()
val attemptId = client.getAttemptId().getAttemptId().toString()
val historyAddress = ApplicationMaster
.getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)
//调用YarnRMClient的register方法进行注册
client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
registered = true
}
}
private[spark] class YarnRMClient extends Logging {
def register(
driverHost: String,
driverPort: Int,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String): Unit = {
//构建AM与RMCommunications client and start the
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}
logInfo("Registering the ApplicationMaster")
synchronized {
//注册AM
amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
registered = true
}
}
}
//AMRMClientImpl是AMRMClient的实现之一
public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
//注册AM:将AM的host、port、appTrackingUrl信息绑定到AMRMClient中,Ready to register the environment
public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException {
this.appHostName = appHostName;
this.appHostPort = appHostPort;
this.appTrackingUrl = appTrackingUrl;
Preconditions.checkArgument(appHostName != null, "The host name should not be null");
Preconditions.checkArgument(appHostPort >= -1, "Port number of the host should be any integers larger than or equal to -1");
//环境准备好后,Call the specific registration logic
return this.registerApplicationMaster();
}
//具体AM注册逻辑
private RegisterApplicationMasterResponse registerApplicationMaster() throws YarnException, IOException {
//Registration information encapsulation
RegisterApplicationMasterRequest request = RegisterApplicationMasterRequest.newInstance(this.appHostName, this.appHostPort, this.appTrackingUrl);
//RPC方式注册,封装响应信息
RegisterApplicationMasterResponse response = this.rmClient.registerApplicationMaster(request);
synchronized(this) {
this.lastResponseId = 0;
if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
this.populateNMTokens(response.getNMTokensFromPreviousAttempts());
}
return response;
}
}
}
2.3.1.2.1 RegisterApplicationMasterRequest The registration request information encapsulation
主要封装host、port、web url;
public abstract class RegisterApplicationMasterRequest {
public static RegisterApplicationMasterRequest newInstance(String host, int port, String trackingUrl) {
RegisterApplicationMasterRequest request = (RegisterApplicationMasterRequest)Records.newRecord(RegisterApplicationMasterRequest.class);
//ApplicationMasterStart the nodehost
request.setHost(host);
//ApplicationMasterThe launch of foreignrpc的端口号
request.setRpcPort(port);
//ApplicationMasterProvide trackingweb url,用户可以通过该urlCheck the application execution status
request.setTrackingUrl(trackingUrl);
return request;
}
}
2.3.1.2.2 egisterApplicationMasterResponse Registered response information encapsulation
In response to important information:You may apply for the biggest singleContainerUtilization of resources、应用程序访问控制列表;
public class RegisterApplicationMasterResponsePBImpl extends RegisterApplicationMasterResponse {
Builder builder = null;
boolean viaProto = false;
//You may apply for the biggest singleContainerUtilization of resources
private Resource maximumResourceCapability;
//应用程序访问控制列表
private Map<ApplicationAccessType, String> applicationACLS = null;
private List<Container> containersFromPreviousAttempts = null;
private List<NMToken> nmTokens = null;
private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
}
2.3.1.3.AM向RM申请资源
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
val appId = client.getAttemptId().getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}
//创建分配器
allocator = client.createAllocator(
yarnConf,
_sparkConf,
driverUrl,
driverRef,
securityMgr,
localResources)
credentialRenewer.foreach(_.setDriverRef(driverRef))
// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
// that when the driver sends an initial executor request (e.g. after an AM restart),
// the allocator is ready to service requests.
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
//Distributor allocate resources
allocator.allocateResources()
val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
// do not register static sources in this case as per SPARK-25277
ms.start(false)
metricsSystem = Some(ms)
reporterThread = launchReporterThread()
}
}
2.3.1.3.1.createAllocator创建分配器
private[spark] class YarnRMClient extends Logging {
def createAllocator(
conf: YarnConfiguration,
sparkConf: SparkConf,
driverUrl: String,
driverRef: RpcEndpointRef,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource]): YarnAllocator = {
require(registered, "Must register AM before creating allocator.")
//通过实例化YarnAllocator,由YarnAllocator向RM申请container资源
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
localResources, new SparkRackResolver())
}
}
2.3.1.3.2.allocateResourcesDistributor application、分配资源
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock)
extends Logging {
def allocateResources(): Unit = synchronized {
//Update the container request list:According to the current runningexecutorQuantity and to requestexecutor总数,To update to theResourceManager请求的containers数.
updateResourceRequests()
val progressIndicator = 0.1f
// 向ResourceManager申请containers资源,And return allocation response
val allocateResponse = amClient.allocate(progressIndicator)
//获取分配的containers列表
val allocatedContainers = allocateResponse.getAllocatedContainers()
//Blacklist node tracking
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
//如果分配的containers数大于0,Deal with thesecontainers
if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
runningExecutors.size,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))
//处理从ResourceManager获取的containers,并在containers中启动executor
handleAllocatedContainers(allocatedContainers.asScala)
}
//Access has been usecontainers列表,Also may be wrongcontainers
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
//Processing usedcontainers
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, runningExecutors.size))
}
}
}
2.3.1.3.2.1.updateResourceRequests Update the container request
According to the current runningexecutorQuantity and to requestexecutor总数,To update to theResourceManager请求的containers数.
根据每个节点的task,Suspends the container request list to group:
Local can match the request list;
The local list does not match the request;
Non-local requests list;
For those who match the request of the local list two requests,Will be cancelled and re-launch the request,然后,根据containerPlacement strategy to recalculate the local,To maximize the local task execution;
Focus on container for local,Try to improve the container for local,Don't can't satisfy the request of the local,将其取消;
当已存在的executorNumber more than the need ofexecutor数量时,Removed from the pending container request number of container for more;
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock)
extends Logging {
def updateResourceRequests(): Unit = {
// Pending container request sequences
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
//Calculate the missingexecutor数量
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - runningExecutors.size
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${
runningExecutors.size}, " +
s"executorsStarting: ${
numExecutorsStarting.get}")
// Pending container for classification:
// localRequests: Local can match the request list
// staleRequests: The local list does not match the request
// anyHostRequests: Non-local requests list;
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
s"${
resource.getVirtualCores} core(s) and " +
s"${
resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")
// Cancel the local list does not match the request
staleRequests.foreach {
stale =>
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
if (cancelledContainers > 0) {
logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
}
// Computing available container number
val availableContainers = missing + cancelledContainers
// Calculation of potential container:The non-local requests include,Try to increase the degree of localization;
val potentialContainers = availableContainers + anyHostRequests.size
//重新计算每个containerThe node of local sex(node locality)And frame local(rack locality,The current frame other nodes)
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
allocatedHostToContainersMap, localRequests)
根据计算的containers本地性,重新实例化container请求
val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
containerLocalityPreferences.foreach {
case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
newLocalityRequests += createContainerRequest(resource, nodes, racks)
case _ =>
}
//当前可用的containersCan meet all the newcontainer请求
if (availableContainers >= newLocalityRequests.size) {
for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
newLocalityRequests += createContainerRequest(resource, null, null)
}
}
//当前可用的containersCan't meet all the newcontainer请求,Can't satisfy the request of will in the other frame on the node placecontainer,So will cancel these requests,To obtain a better local
else {
val numToCancel = newLocalityRequests.size - availableContainers
anyHostRequests.slice(0, numToCancel).foreach {
nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
if (numToCancel > 0) {
logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
}
}
//重新添加container请求:将请求传递给RM
newLocalityRequests.foreach {
request =>
amClient.addContainerRequest(request)
}
if (log.isInfoEnabled()) {
val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
if (anyHost.nonEmpty) {
logInfo(s"Submitted ${
anyHost.size} unlocalized container requests.")
}
localized.foreach {
request =>
logInfo(s"Submitted container request for host ${
hostStr(request)}.")
}
}
}
//挂起的+启动的+运行中的executorNumber of diverse needsexecutor数量,From hangs in the request to cancel the request of more;
else if (numPendingAllocate > 0 && missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")
// cancel pending allocate requests by taking locality preference into account
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
cancelRequests.foreach(amClient.removeContainerRequest)
}
}
}
2.3.1.3.2.2.handleAllocatedContainers Treatment allocation of resources
根据匹配规则,在节点、机架、Other frame3A scenario select available containers;
In the available container to startexecutor
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock)
extends Logging {
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
//可用容器列表
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// 根据节点(host)Combined with the matching rules,Select available containers
val remainingAfterHostMatches = new ArrayBuffer[Container]
for (allocatedContainer <- allocatedContainers) {
matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
containersToUse, remainingAfterHostMatches)
}
// According to the frame combination matching rules,Select available containers:A separate thread to complete
val remainingAfterRackMatches = new ArrayBuffer[Container]
if (remainingAfterHostMatches.nonEmpty) {
var exception: Option[Throwable] = None
val thread = new Thread("spark-rack-resolver") {
override def run(): Unit = {
try {
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
} catch {
case e: Throwable =>
exception = Some(e)
}
}
}
thread.setDaemon(true)
thread.start()
try {
thread.join()
} catch {
case e: InterruptedException =>
thread.interrupt()
throw e
}
if (exception.isDefined) {
throw exception.get
}
}
// This node, and the frame of the container,再次匹配
val remainingAfterOffRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterRackMatches) {
matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
remainingAfterOffRackMatches)
}
//After this node、本机架、This node, and the frame 3After the matching filter in,Is not matching on,内部释放
if (!remainingAfterOffRackMatches.isEmpty) {
logDebug(s"Releasing ${
remainingAfterOffRackMatches.size} unneeded containers that were " +
s"allocated to us")
for (container <- remainingAfterOffRackMatches) {
internalReleaseContainer(container)
}
}
//在分配的containers中启动executors
runAllocatedContainers(containersToUse)
logInfo("Received %d containers from YARN, launching executors on %d of them."
.format(allocatedContainers.size, containersToUse.size))
}
}
3.执行流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GAjslApe-1658742053147)(/Users/daiqing/Library/Application Support/typora-user-images/image-20220725173937925.png)]
4.总结
在AM中,做了3件事情:
1、创建driver线程并启动driver线程,执行用户类定义的main方法;
2、向RM注册AM;
3、AM向RM申请资源,根据资源(containers)匹配规则选择可用资源,And in the fragmentation of thecontainer中启动executor;
driver线程是一个AMIn the process of execution user classesmain方法的线程,在定义driver线程后,就将driver线程启动了,启动后执行的runMethod to complete the user class definitionmain方法的调用;
AM向RM注册时,将AM的host、port、web url传给RM,RMThe maximum may apply for a singleContainerUtilization of resources、Application access control list feedback;
AM向RM申请资源的过程:在AMCreate resources distributor,By the resource allocator toRM申请资源、筛选资源(container)、在container中启动executor;
5.参考资料
spark源码-任务提交流程之YarnClusterApplication
Spark内核之YARN Cluster模式源码详解(Submit详解)
yarn2.7源码分析之ApplicationMaster与ResourceManager.ApplicationMasterService的通信
2,spark源码分析-ApplicationMaster启动
spark源码跟踪(十一)ApplicationMasterThe key thread
Spark源码——Spark on YARN Container资源申请分配、Executor的启动
Yarn源码剖析(四)-- AMRegistered with the resource scheduling applicationContainer及启动
边栏推荐
- Spark source code-task submission process-6.1-sparkContext initialization-create spark driver side execution environment SparkEnv
- VLAN details and experiments
- [Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
- spark算子-coalesce算子
- 入门文档04 一个任务依赖另外一个任务时,需要按顺序执行
- 通过单总线调用ds18b20的问题
- [Day1] (Super detailed steps) Build a soft RAID disk array
- 正则表达式小实例--验证邮箱地址
- CIPU,对云计算产业有什么影响
- 论那些给得出高薪的游戏公司底气到底在哪里?
猜你喜欢
Getting Started Doc 06 Adding files to a stream
lvm logical volume and disk quota
静态路由
入门文档12 webserve + 热更新
阿里云视频点播
[Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
2020,Laya最新中高级面试灵魂32问,你都知道吗?
spark source code - task submission process - 1-sparkSubmit
VRRP principle and command
Getting Started 05 Using cb() to indicate that the current task is complete
随机推荐
Getting Started Document 07 Staged Output
交换机原理
TCP/IP四层模型
spark源码-任务提交流程之-3-ApplicationMaster
Hugo搭建个人博客
入门文档03 区分开发与生产环境(生产环境才执行‘热更新’)
入门文档04 一个任务依赖另外一个任务时,需要按顺序执行
lvm logical volume and disk quota
小度 小度 在呢!
入门文档05 使用cb()指示当前任务已完成
静态路由
入门文档05-2 使用return指示当前任务已完成
入门文档01 series按顺序执行
云计算——osi七层与TCP\IP协议
spark算子-map vs mapPartitions算子
入门文档06 向流(stream)中添加文件
TCP/IP four-layer model
[Day1] (Super detailed steps) Build a soft RAID disk array
腾讯内部技术:《轩辕传奇》服务器架构演变
Call the TensorFlow Objection Detection API for object detection and save the detection results locally