当前位置:网站首页>Spark source code - task submission process - 6-sparkContext initialization
Spark source code - task submission process - 6-sparkContext initialization
2022-08-05 06:12:00 【zdaiqing】
sparkContext初始化
1.sparkContext初始化
1.1.sparkContext实例化
在sparkContext中,Multiple constructors with different number of arguments are provided,可以通过newObjects are instantiated as constructors;
class SparkContext(config: SparkConf) extends Logging {
def this() = this(new SparkConf())
def this(master: String, appName: String, conf: SparkConf) =
this(SparkContext.updatedConf(conf, master, appName))
def this(
master: String,
appName: String,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map()) = {
this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
}
private[spark] def this(master: String, appName: String) =
this(master, appName, null, Nil, Map())
private[spark] def this(master: String, appName: String, sparkHome: String) =
this(master, appName, sparkHome, Nil, Map())
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map())
}
同时,sparkContextThere is also a class companion object of the same name,在伴生对象中,提供了getOrCreate方法,可以通过sparkContext.getOrCreate方法实例化对象;
object SparkContext extends Logging {
def getOrCreate(): SparkContext = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(), allowMultipleContexts = false)
}
activeContext.get()
}
}
def getOrCreate(config: SparkConf): SparkContext = {
// Synchronize to ensure that multiple create requests don't trigger an exception
// from assertNoOtherContextIsRunning within setActiveContext
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
} else {
if (config.getAll.nonEmpty) {
logWarning("Using an existing SparkContext; some configuration may not take effect.")
}
}
activeContext.get()
}
}
}
1.2.初始化分析
实例化sparkContex对象的过程,会调用sparkContext的构造器,执行构造器中的代码,完成sparkContext的初始化;
在scala中,成员变量的初始化、try catchCode blocks are placed in the constructor when decompiled,在调用构造函数时执行;即sparkContextThe initialization of will complete the initialization of member variables、try catch代码块的执行;
在sparkContextduring a constructor call,will be done firstconfig变量的初始化、The member variables are then completed in code order、try catch代码块的初始化;
1.2.1.成员变量的定义
class SparkContext(config: SparkConf) extends Logging {
private var _conf: SparkConf = _
//保存EventPath to related information,在spark.eventLog.enabled如果为 true情况下启用
private var _eventLogDir: Option[URI] = None
//How the event log is compressed
private var _eventLogCodec: Option[String] = None
//sparkListener主线,主要在driverVarious side registrationssparkListener, receive variousexecutor发送过来的SparkListenerEvent
private var _listenerBus: LiveListenerBus = _
//spark的运行环境
private var _env: SparkEnv = _
//Low-level status reports for monitoring job and stage progressapi
private var _statusTracker: SparkStatusTracker = _
//console 进度条
private var _progressBar: Option[ConsoleProgressBar] = None
//spark ui
private var _ui: Option[SparkUI] = None
//hadoop配置
private var _hadoopConfiguration: Configuration = _
// executor 的内存
private var _executorMemory: Int = _
//The backend of the scheduler:scheduler backend
private var _schedulerBackend: SchedulerBackend = _
//任务调度程序:taskScheduler
private var _taskScheduler: TaskScheduler = _
//Heartbeat receiver node
private var _heartbeatReceiver: RpcEndpointRef = _
//有向无环图调度器
@volatile private var _dagScheduler: DAGScheduler = _
//应用程序id
private var _applicationId: String = _
//应用程序attempt id
private var _applicationAttemptId: Option[String] = None
//One that logs events to persistent storageSparkListener
private var _eventLogger: Option[EventLoggingListener] = None
//An agent that dynamically allocates and removes executors based on workload.
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
//RDD、shuffle和broadcastAsynchronous cleaner for state
private var _cleaner: Option[ContextCleaner] = None
//spark listenerWhether the main line is activated to determine the flag
private var _listenerBusStarted: Boolean = false
//spark应用所依赖的jar包
private var _jars: Seq[String] = _
//外部资源文件
private var _files: Seq[String] = _
//spark context 关闭时,The callback operation to do
private var _shutdownHookRef: AnyRef = _
//应用程序的k-v内存存储
private var _statusStore: AppStatusStore = _
}
12.2.try catchMember variable initialization in a code block
class SparkContext(config: SparkConf) extends Logging {
try {
//conf配置初始化
_conf = config.clone()
//Check for illegal or deprecated configurations
_conf.validateSettings()
//运行模式master和appName设置校验:必设
if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
}
if (!_conf.contains("spark.app.name")) {
throw new SparkException("An application name must be set in your configuration")
}
// log out spark.app.name in the Spark driver logs
logInfo(s"Submitted application: $appName")
// yarn集群模式spark.yarn.app.id校验
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}
if (_conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:\n" + _conf.toDebugString)
}
// 设置driver的host和port
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
//设置driver的executor的id
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
//初始化spark应用所依赖的jar包
_jars = Utils.getUserJars(_conf)
//Initialize external resource files that need to be loaded,在driver和executoraccess during the process
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
//Initialize the event log directory;可以是hdfs://开头的HDFS路径,也可以是file://开头的本地路径,都需要提前创建.Defaults to no filesystem prefixHDFS目录
_eventLogDir =
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
.stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
//Compression encoding to initialize event records:默认lz4;支持lz4,lzf,snappy,zstd;
_eventLogCodec = {
val compress = _conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
} else {
None
}
}
//Initialize the listener bus
//在调用start()之前,All submitted events are only buffered.Only after this listener bus has started,The event is then actually propagated to all attached listeners.当调用stop()时,The listener bus will stop,And after stopping it will delete further events.
_listenerBus = new LiveListenerBus(_conf)
// 初始化app的k-v内存存储
_statusStore = AppStatusStore.createLiveStore(conf)
//to the listener busappStatusQueue add listeners;That is, register with the listener busappStatus侦听器
listenerBus.addToStatusQueue(_statusStore.listener.get)
// 构建spark driver端执行环境
//包含所有的spark Instance runtime object(master 或 worker),Serializers are included,RPCEnv,block manager, map out tracker等
_env = createSparkEnv(_conf, isLocal, listenerBus)
//Initialize the execution environment instance as a global variable,可以通过SparkEvn.get方式获取
SparkEnv.set(_env)
// 设置replThe output directory in mode
_conf.getOption("spark.repl.class.outputDir").foreach {
path =>
val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
_conf.set("spark.repl.class.uri", replUri)
}
//初始化job、stage状态跟踪器:Low-level status reports for monitoring job and stage progressapi
_statusTracker = new SparkStatusTracker(this, _statusStore)
//初始化console进度条
_progressBar =
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}
//初始化ui
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
// For tests, do not enable the UI
None
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())
//初始化hadoop配置
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
// 将依赖jarThe package is added to the execution environment-->In the file server where the execution environment is instantiated
if (jars != null) {
jars.foreach(addJar)
}
//Pull external resource files into each execution node
if (files != null) {
files.foreach(addFile)
}
//executor内存初始化
_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)
//Initialize executor environment variables
//将javaOptions are converted to environment variables
for {
(envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value
}
Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach {
v =>
executorEnvs("SPARK_PREPEND_CLASSES") = v
}
// 针对Mesos scheduler backend,设置执行器内存
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
//Obtained from the executor environment variables configured in the configuration file
executorEnvs ++= _conf.getExecutorEnv
//Set the program running user
executorEnvs("SPARK_USER") = sparkUser
//Initialize the heartbeat receiver node
//需要在初始化TaskSchedulerBefore completing the initialization of the heartbeat receiver node-->执行器executor会在taskScheduler构造时,检索HeartbeatReceive
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
// 初始化taskScheduler、schedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
//初始化dagScheduler
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
//在TaskScheduler的构造函数中设置DAGSchedulerStart after quotingTaskScheduler
_taskScheduler.start()
//从taskScheduler中获取applicationId、applicationAttemptId
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
//为UI设置applicationId
_ui.foreach(_.setAppId(_applicationId))
//初始化block Manager
_env.blockManager.initialize(_applicationId)
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
//启动 metricsSystem
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
//将 metricSystem 的 servlet handler 给 ui 用
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
//Initialize the event logging listener
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}
// Optionally scale number of executors dynamically based on workload. Exposed for testing.
//如果启用了动态分配 executor,Enables agents that dynamically allocate and delete executors based on workload
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
//Enable state async cleaner
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())
//注册spark.extraListenersThe listener specified in to the listener bus,Then start the listener bus.
//This should be called after all internal listeners are registered to the listening bus
setupAndStartListenerBus()
//The task scheduler is ready,Publish environment update events
postEnvironmentUpdate()
//Publish application startup events
postApplicationStart()
// Post init
//Called after the system has successfully initialized(通常在spark上下文中).
// YarnUse it to guide resource allocation based on preferred location,
//Waiting for slave node registration etc.
_taskScheduler.postStartHook()
//注册 dagScheduler metricsSource
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
//注册 metric source
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
//注册 metric source
_executorAllocationManager.foreach {
e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
logDebug("Adding shutdown hook") // force eager creation of logger
//设置 shutdown hook, 在spark context 关闭时,The callback operation to do
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) {
() =>
logInfo("Invoking stop() from shutdown hook")
try {
stop()
} catch {
case e: Throwable =>
logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
}
}
} catch {
case NonFatal(e) =>
logError("Error initializing SparkContext.", e)
try {
stop()
} catch {
case NonFatal(inner) =>
logError("Error stopping SparkContext after init error.", inner)
} finally {
throw e
}
}
}
2.参考资料
spark可用属性
Spark架构原理-SparkContext的初始化原理
SparkContextAnalysis of the initialization principle
3,spark源码分析-SparkContext初始化
sparkContext之一:sparkContext的初始化分析
scala classDemystifying orphaned code blocks in
spark 源码分析之二 – SparkContext 的初始化过程
spark 源码分析之四 – TaskScheduler的创建和启动过程
Spark --files理解
Spark --jars Priority order of dependent packages
Spark入门之REPL/CLI/spark shell 快速学习
边栏推荐
猜你喜欢
技术分享杂七杂八技术
[Paper Intensive Reading] The relationship between Precision-Recall and ROC curves
【Day8】(超详细步骤)使用LVM扩容
The problem of redirecting to the home page when visiting a new page in dsf5.0
[Day8] Commands involved in using LVM to expand
图片压缩失效问题
Getting Started Document 07 Staged Output
spark算子-map vs mapPartitions算子
[Day1] VMware software installation
入门文档06 向流(stream)中添加文件
随机推荐
账号与权限管理
TCP/IP four-layer model
spark算子-textFile算子
TensorFlow ObjecDetectionAPI在win10系统Anaconda3下的配置
云计算——osi七层与TCP\IP协议
D41_缓冲池
To TrueNAS PVE through hard disk
markdown编辑器模板
入门文档12 webserve + 热更新
spark算子-repartition算子
vim的三种模式
什么是阿里云·速成美站?
Dsf5.0 bounced points determine not return a value
入门文档04 一个任务依赖另外一个任务时,需要按顺序执行
无影云桌面
spark source code - task submission process - 2-YarnClusterApplication
spark source code-RPC communication mechanism
【Day1】VMware软件安装
spark算子-parallelize算子
Cocos Creator开发中的事件响应