当前位置:网站首页>spark source code - task submission process - 2-YarnClusterApplication
spark source code - task submission process - 2-YarnClusterApplication
2022-08-05 06:10:00 【zdaiqing】
YarnClusterApplication
1.概述
在【spark源码-Task submission processsparkSubmit】中分析到,sparkSubmit在执行过程中,Different ones will be selected according to the deployment modesparkApplicationSubclass implementations are instantiated,and start instantiating the object;
针对yarn-cluster模式,The constructed instance isorg.apache.spark.deploy.yarn.YarnClusterApplication类;Next, analyze the execution flowYarnClusterApplication中的执行过程;
2.YarnClusterApplication
在YarnClusterApplication的start方法中,解析参数、构造rm客户端,调用客户端run方法;
private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// 在yarn模式下使用yarnCache to distribute files and jar文件,从conf中移除jars和files配置
conf.remove("spark.jars")
conf.remove("spark.files")
//解析构造rm client的参数对象
//构造RM客户端对象
//执行RM客户端对象的run方法
new Client(new ClientArguments(args), conf).run()
}
}
3.ClientArguments 解析命令行参数
private[deploy] class ClientArguments(args: Array[String]) {
import ClientArguments._
var cmd: String = "" // 'launch' or 'kill'
var logLevel = Level.WARN
// launch parameters
var masters: Array[String] = null //节点url(host:port),以spark://开头
var jarUrl: String = "" //jar包路径
var mainClass: String = "" //应用程序class的全路径
var supervise: Boolean = DEFAULT_SUPERVISE //Make sure the driver automatically restarts if it fails with a non-zero exit code,默认false
var memory: Int = DEFAULT_MEMORY //默认值1024M
var cores: Int = DEFAULT_CORES //默认值1
private var _driverOptions = ListBuffer[String]()
def driverOptions: Seq[String] = _driverOptions.toSeq
// kill parameters
var driverId: String = ""
//解析参数
parse(args.toList)
@tailrec
private def parse(args: List[String]): Unit = args match {
}
}
private[deploy] object ClientArguments {
val DEFAULT_CORES = 1
val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // MB
val DEFAULT_SUPERVISE = false
def isValidJarUrl(s: String): Boolean = {
}
}
3.1.DEFAULT_MEMORY 默认内存设置
private[deploy] object ClientArguments {
val DEFAULT_CORES = 1 //cpudefault number of cores1
val DEFAULT_MEMORY = Utils.DEFAULT_DRIVER_MEM_MB // Memory default value1024M
}
private[spark] object Utils extends Logging {
val DEFAULT_DRIVER_MEM_MB = JavaUtils.DEFAULT_DRIVER_MEM_MB.toInt
}
public class JavaUtils {
public static final long DEFAULT_DRIVER_MEM_MB = 1024L;
}
4.Client 构造RM Client对象
在clint实例化过程中,对yarnClient、Distributed cache manager、The communication component is instantiated,设置了am和executor的内存、堆外内存、cpu核数
private[spark] class Client(
val args: ClientArguments,
val sparkConf: SparkConf)
extends Logging {
//实例化yarnClient
private val yarnClient = YarnClient.createYarnClient
//实例化conf
private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
//Determine whether to deploy in a cluster
private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
//AM内存,集群模式,取driver内存,否则取am内存配置
private val amMemory = if (isClusterMode) {
sparkConf.get(DRIVER_MEMORY).toInt
} else {
sparkConf.get(AM_MEMORY).toInt
}
//堆外内存
private val amMemoryOverhead = {
val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
sparkConf.get(amMemoryOverheadEntry).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
}
//am的cpu核数
private val amCores = if (isClusterMode) {
sparkConf.get(DRIVER_CORES)
} else {
sparkConf.get(AM_CORES)
}
//executor内存
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
//executor堆外内存
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
private val isPython = sparkConf.get(IS_PYTHON_APP)
private val pysparkWorkerMemory: Int = if (isPython) {
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
0
}
//Instantiate the distributed cache manager
private val distCacheMgr = new ClientDistributedCacheManager()
//在安全的HDFS上运行时使用Principal来登录到KDC
private val principal = sparkConf.get(PRINCIPAL).orNull
//Contains the principal specified abovekeytab文件的完整路径.This keytab will be replicated to the run via the secure distributed cacheYARNThe node that the application masters,to periodically renew login tickets and delegation tokens
private val keytab = sparkConf.get(KEYTAB).orNull
private val loginFromKeytab = principal != null
private val amKeytabFileName: String = {
require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
if (loginFromKeytab) {
logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab")
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
new File(keytab).getName() + "-" + UUID.randomUUID().toString
} else {
null
}
}
//实例化与LauncherServerComponents that communicate
private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sparkConf
override def onStopRequest(): Unit = {
if (isClusterMode && appId != null) {
yarnClient.killApplication(appId)
} else {
setState(SparkAppHandle.State.KILLED)
stop()
}
}
}
private val fireAndForget = isClusterMode && !sparkConf.get(WAIT_FOR_APP_COMPLETION)
private var appId: ApplicationId = null
// app staging根目录
private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map {
new Path(_) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
}
5.执行client 的run方法
提交应用,获取应用id,根据应用id监控应用状态;
private[spark] class Client(
def run(): Unit = {
//提交应用,返回应用id
this.appId = submitApplication()
if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
logInfo(formatReportDetails(report))
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
//监控应用状态
val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
diags.foreach {
err =>
logError(s"Application diagnostics message: $err")
}
throw new SparkException(s"Application $appId finished with failed status")
}
if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalState == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
}
}
6.执行client的submitApplication方法
private[spark] class Client(
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
//初始化hadoop的环境
yarnClient.init(hadoopConf)
//启动yarn client,链接yarn
yarnClient.start()
logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
// yarn客户端从RM创建一个新的应用
val newApp = yarnClient.createApplication()
//Get the app creation result,得到应用id
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()
// Verify that the cluster has enough resources for itAM
verifyClusterResources(newAppResponse)
// 配置AM的启动参数,内部进行了command的封装:,
//【cluster模式】command = bin/java org.apache.spark.deploy.yarn.ApplicationMaster, //【client模式】command = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
//提交yarn应用程序到RM
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
appId
} catch {
case e: Throwable =>
if (appId != null) {
cleanupStagingDir(appId)
}
throw e
}
}
}
6.1 createContainerLaunchContext Build the context of the submitted application
在当前方法中,定义了am的启动类:
【cluster模式】amClass= bin/java org.apache.spark.deploy.yarn.ApplicationMaster
【client模式】amClass = bin/java org.apache.spark.deploy.yarn.ExecutorLauncher
private[spark] class Client(
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
...........
val javaOpts = ListBuffer[String]()
............
// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"
.............
javaOpts += "-Djava.io.tmpdir=" + tmpDir
..............
// In our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
javaOpts += "-XX:+UseConcMarkSweepGC"
javaOpts += "-XX:MaxTenuringThreshold=31"
javaOpts += "-XX:SurvivorRatio=8"
javaOpts += "-XX:+CMSIncrementalMode"
javaOpts += "-XX:+CMSIncrementalPacing"
javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
}
....................
//AM启动类
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
}
val userArgs = args.userArgs.flatMap {
arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
// Command for the ApplicationMaster
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands.asJava)
}
}
7.参考资料
边栏推荐
猜你喜欢
Getting Started 05 Using cb() to indicate that the current task is complete
D45_摄像机组件Camera
Getting Started Doc 06 Adding files to a stream
入门文档12 webserve + 热更新
lvm逻辑卷及磁盘配额
[Day1] (Super detailed steps) Build a soft RAID disk array
Hard Disk Partitioning and Permanent Mounting
spark源码-任务提交流程之-7-流程梳理总结
乘云科技受邀出席2022阿里云合作伙伴大会荣获“聚力行远奖”
Getting Started Documentation 12 webserve + Hot Updates
随机推荐
每日一题-最长回文子串-0714
D46_给刚体施加的力
每日一题-单调栈
Getting Started Document 01 series in order
什么是阿里云·速成美站?
unity实现第一人称漫游(保姆级教程)
lvm逻辑卷及磁盘配额
vim的三种模式
每日一题-合并K个升序链表-0722
图片压缩失效问题
【UiPath2022+C#】UiPath If条件语句
【UiPath2022+C#】UiPath 数据操作
Wireshark抓包及常用过滤方法
Image compression failure problem
不吹不黑,这的确是我看过微服务架构最好的文章!
Unity huatuo 革命性热更系列1.3 huatuo示例项目源码分析与启发
ROS视频教程
Contextual non-local alignment of full-scale representations
Getting Started Document 07 Staged Output
Cocos Creator开发中的事件响应