当前位置:网站首页>spark source code - task submission process - 2-YarnClusterApplication
spark source code - task submission process - 2-YarnClusterApplication
2022-08-05 06:10:00 【zdaiqing】
YarnClusterApplication
1.概述
在【spark源码-Task submission processsparkSubmit】中分析到,sparkSubmit在执行过程中,Different ones will be selected according to the deployment modesparkApplicationSubclass implementations are instantiated,and start instantiating the object;
针对yarn-cluster模式,The constructed instance isorg.apache.spark.deploy.yarn.YarnClusterApplication类;Next, analyze the execution flowYarnClusterApplication中的执行过程;
2.YarnClusterApplication
在YarnClusterApplication的start方法中,解析参数、构造rm客户端,调用客户端run方法;
private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// 在yarn模式下使用yarnCache to distribute files and jar文件,从conf中移除jars和files配置
conf.remove("spark.jars")
conf.remove("spark.files")
//解析构造rm client的参数对象
//构造RM客户端对象
//执行RM客户端对象的run方法
new Client(new ClientArguments(args), conf).run()
}
}
3.ClientArguments 解析命令行参数
private[deploy] class ClientArguments(args: Array[String]) {
import ClientArguments._
var cmd: String = "" // 'launch' or 'kill'
var logLevel = Level.WARN
// launch parameters
var masters: Array[String] = null //节点url(host:port),以spark://开头
var jarUrl: String = "" //jar包路径
var mainClass: String = "" //应用程序class的全路径
var supervise: Boolean = DEFAULT_SUPERVISE //Make sure the driver automatically restarts if it fails with a non-zero exit code,默认false
var memory: Int = DEFAULT_MEMORY //默认值1024M
var cores: Int = DEFAULT_CORES //默认值1
private var _driverOptions = ListBuffer[String]()
def driverOptions: Seq[String] = _driverOptions.toSeq
// kill parameters
var driverId: String = ""
//解析参数
parse(args.toList)
@tailrec
private def parse(args: List[String]): Unit = args match {
}
}
private[deploy] object ClientArguments {
val DEFAULT_CORES = 1
val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // MB
val DEFAULT_SUPERVISE = false
def isValidJarUrl(s: String): Boolean = {
}
}
3.1.DEFAULT_MEMORY 默认内存设置
private[deploy] object ClientArguments {
val DEFAULT_CORES = 1 //cpudefault number of cores1
val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // Memory default value1024M
}
private[spark] object Utils extends Logging {
val DEFAULT_DRIVER_MEM_MB = JavaUtils.DEFAULT_DRIVER_MEM_MB.toInt
}
public class JavaUtils {
public static final long DEFAULT_DRIVER_MEM_MB = 1024L;
}
4.Client 构造RM Client对象
在clint实例化过程中,对yarnClient、Distributed cache manager、The communication component is instantiated,设置了am和executor的内存、堆外内存、cpu核数
private[spark] class Client(
val args: ClientArguments,
val sparkConf: SparkConf)
extends Logging {
//实例化yarnClient
private val yarnClient = YarnClient.createYarnClient
//实例化conf
private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
//Determine whether to deploy in a cluster
private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
//AM内存,集群模式,取driver内存,否则取am内存配置
private val amMemory = if (isClusterMode) {
sparkConf.get(DRIVER_MEMORY).toInt
} else {
sparkConf.get(AM_MEMORY).toInt
}
//堆外内存
private val amMemoryOverhead = {
val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
sparkConf.get(amMemoryOverheadEntry).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
}
//am的cpu核数
private val amCores = if (isClusterMode) {
sparkConf.get(DRIVER_CORES)
} else {
sparkConf.get(AM_CORES)
}
//executor内存
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
//executor堆外内存
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
private val isPython = sparkConf.get(IS_PYTHON_APP)
private val pysparkWorkerMemory: Int = if (isPython) {
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
0
}
//Instantiate the distributed cache manager
private val distCacheMgr = new ClientDistributedCacheManager()
//在安全的HDFS上运行时使用Principal来登录到KDC
private val principal = sparkConf.get(PRINCIPAL).orNull
//Contains the principal specified abovekeytab文件的完整路径.This keytab will be replicated to the run via the secure distributed cacheYARNThe node that the application masters,to periodically renew login tickets and delegation tokens
private val keytab = sparkConf.get(KEYTAB).orNull
private val loginFromKeytab = principal != null
private val amKeytabFileName: String = {
require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
if (loginFromKeytab) {
logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab")
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
new File(keytab).getName() + "-" + UUID.randomUUID().toString
} else {
null
}
}
//实例化与LauncherServerComponents that communicate
private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sparkConf
override def onStopRequest(): Unit = {
if (isClusterMode && appId != null) {
yarnClient.killApplication(appId)
} else {
setState(SparkAppHandle.State.KILLED)
stop()
}
}
}
private val fireAndForget = isClusterMode && !sparkConf.get(WAIT_FOR_APP_COMPLETION)
private var appId: ApplicationId = null
// app staging根目录
private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map {
new Path(_) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
}
5.执行client 的run方法
提交应用,获取应用id,根据应用id监控应用状态;
private[spark] class Client(
def run(): Unit = {
//提交应用,返回应用id
this.appId = submitApplication()
if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
logInfo(formatReportDetails(report))
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
//监控应用状态
val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
diags.foreach {
err =>
logError(s"Application diagnostics message: $err")
}
throw new SparkException(s"Application $appId finished with failed status")
}
if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalState == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
}
}
6.执行client的submitApplication方法
private[spark] class Client(
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
//初始化hadoop的环境
yarnClient.init(hadoopConf)
//启动yarn client,链接yarn
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// yarn客户端从RM创建一个新的应用
val newApp = yarnClient.createApplication()
//Get the app creation result,得到应用id
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()
// Verify that the cluster has enough resources for itAM
verifyClusterResources(newAppResponse)
// 配置AM的启动参数,内部进行了command的封装:,
//【cluster模式】command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster, //【client模式】command = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
//提交yarn应用程序到RM
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId
} catch {
case e: Throwable =>
if (appId != null) {
cleanupStagingDir(appId)
}
throw e
}
}
}
6.1 createContainerLaunchContext Build the context of the submitted application
在当前方法中,定义了am的启动类:
【cluster模式】amClass= bin/java org.apache.spark.deploy.yarn.ApplicationMaster
【client模式】amClass = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher
private[spark] class Client(
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
...........
val javaOpts = ListBuffer[String]()
............
// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"
.............
javaOpts += "-Djava.io.tmpdir=" + tmpDir
..............
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
javaOpts += "-XX:+UseConcMarkSweepGC"
javaOpts += "-XX:MaxTenuringThreshold=31"
javaOpts += "-XX:SurvivorRatio=8"
javaOpts += "-XX:+CMSIncrementalMode"
javaOpts += "-XX:+CMSIncrementalPacing"
javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
....................
//AM启动类
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
}
val userArgs = args.userArgs.flatMap {
arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
// Command for the ApplicationMaster
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
}
}
7.参考资料
边栏推荐
猜你喜欢
Getting Started Document 09 Standalone watch
Blender软件介绍与使用心得
入门文档04 一个任务依赖另外一个任务时,需要按顺序执行
【Day8】 RAID磁盘阵列
Remembering my first CCF-A conference paper | After six rejections, my paper is finally accepted, yay!
账号与权限管理
spark算子-map vs mapPartitions算子
入门文档05-2 使用return指示当前任务已完成
【3D模型教程】ZBrush如何表现皮肤纹理?
Three modes of vim
随机推荐
静态路由
spark算子-repartition算子
spark源码-任务提交流程之-4-container中启动executor
每日一题-合并两个有序链表-0720
Getting Started 11 Automatically add version numbers
Blender软件介绍与使用心得
入门文档03 区分开发与生产环境(生产环境才执行‘热更新’)
[Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
Why can't I add a new hard disk to scan?How to solve?
【UiPath2022+C#】UiPath 数据操作
腾讯云云函数SCF—入门须知
如何使用Houdini进行程序化优化?
IP数据包格式(ICMP协议与ARP协议)
入门文档08 条件插件
每日一题-有效的括号-0719
每日一题-正则表达式匹配-0715
The problem of redirecting to the home page when visiting a new page in dsf5.0
每日一题-寻找两个正序数组的中位数-0713
每日一题-合并K个升序链表-0722
vim的三种模式