当前位置:网站首页>Spark source code-task submission process-6.1-sparkContext initialization-create spark driver side execution environment SparkEnv
Spark source code-task submission process-6.1-sparkContext initialization-create spark driver side execution environment SparkEnv
2022-08-05 06:09:00 【zdaiqing】
创建spark driverside execution environmentSparkEnv
- 1.过程分析
- 1.1.入口
- 1.2.createDriverEnv 构建driverside execution environment
- 1.2.1.driver端executor id
- 1.2.2.执行环境sparkEnv的属性
- 1.2.3.create The underlying logic of execution environment construction
- 1.2.3.1.driver端判断
- 1.2.3.2.securityManageSecurity manager initialization
- 1.2.3.3.rpcEnv 初始化
- 1.2.3.4.spark.serializerThe parameter specifies the serializer initialization
- 1.2.3.5.serializerManager Serialization manager initialization
- 1.2.3.6.javaSerializer initialization
- 1.2.3.7.broadcastManager Broadcast manager initialization
- 1.2.3.8.mapOutputTracker MapTask output tracker initialization
- 1.2.3.9.shuffleManager初始化
- 1.2.3.10.useLegacyMemoryManager Unified memory management initialization
- 1.2.3.11.blockManager初始化
- 1.2.3.12.metricsSystem The indicator management system is initialized
- 1.2.3.13.outputCommitCoordinator Output commit coordinator initialization
- 1.2.3.14.driverCreate a temporary directory on the side
- 2.参考资料
1.过程分析
1.1.入口
spark的driverThe creation of the side execution environment is in sparkContext的初始化过程中完成;
The specific entry location is shown in the following code snippet:在sparkContext类的try catch代码块中;
调用链:try catch块 ->createSparkEnv方法->SparkEnv.createDriverEnv方法;
class SparkContext(config: SparkConf) extends Logging {
//无关代码省略...
//创建spark env调用方法
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
//指定driver端可以使用的cpu核数
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
}
//无关代码省略...
try {
//无关代码省略...
// Create the Spark execution environment (cache, map output tracker, etc)
//创建spark env入口
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
//无关代码省略...
} catch {
//无关代码省略...
}
}
1.1.1.driver端cpu核数指定
本地模式下:
没有指定线程数,设置默认值1
指定线程数,Set the value according to the number of threads;
如果线程数为*号,根据jvm虚拟机可用cpu数量确定;
yarn集群模式下,根据spark.driver.cores参数确定,默认值0;
其他情况,默认值设0;
object SparkContext extends Logging {
private[spark] def numDriverCores(master: String, conf: SparkConf): Int = {
//String -> Int
def convertToInt(threads: String): Int = {
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
}
master match {
//本地模式,Default if not specified1
case "local" => 1
//Local mode specifies the number of threads,根据线程数确定
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
//yarn集群模式下,根据spark.driver.cores参数确定,默认值0
case "yarn" =>
if (conf != null && conf.getOption("spark.submit.deployMode").contains("cluster")) {
conf.getInt("spark.driver.cores", 0)
} else {
0
}
//driverNot used or set behinddriver cpuIn the case of nuclear numbers,默认值0
case _ => 0 // Either driver is not being used, or its core count will be interpolated later
}
}
}
1.2.createDriverEnv 构建driverside execution environment
object SparkEnv extends Logging {
private[spark] def createDriverEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains(DRIVER_HOST_ADDRESS),
s"${
DRIVER_HOST_ADDRESS.key} is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
//driverThe address of the network listening socket to which the endpoint is bound
val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
//drvier的host
val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
//driver的port
val port = conf.get("spark.driver.port").toInt
//io加密密钥
val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(conf))
} else {
None
}
//构建driverside execution environment
create(
conf,
SparkContext.DRIVER_IDENTIFIER,//指定driver端executor的id
bindAddress,
advertiseAddress,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
}
}
1.2.1.driver端executor id
private[spark] val DRIVER_IDENTIFIER = "driver"
1.2.2.执行环境sparkEnv的属性
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,//spark.serializerThe serializer specified by the parameter
val closureSerializer: Serializer,//java序列化器
val serializerManager: SerializerManager,//Serializer manager
val mapOutputTracker: MapOutputTracker,//Map任务输出跟踪器
val shuffleManager: ShuffleManager,//shuffle管理器
val broadcastManager: BroadcastManager,//广播管理器
val blockManager: BlockManager,//block管理器
val securityManager: SecurityManager,//安全管理器
val metricsSystem: MetricsSystem,//Metrics of the system
val memoryManager: MemoryManager,//内存管理器
val outputCommitCoordinator: OutputCommitCoordinator,//输出提交协调器
val conf: SparkConf
) extends Logging {
}
1.2.3.create The underlying logic of execution environment construction
1.2.3.1.driver端判断
//Judgment is made based on incoming
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
//driverThe client requires that the listener bus cannot be empty
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
}
1.2.3.2.securityManageSecurity manager initialization
val securityManager = new SecurityManager(conf, ioEncryptionKey)
if (isDriver) {
securityManager.initializeAuth()
}
1.2.3.3.rpcEnv 初始化
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, numUsableCores, !isDriver)
1.2.3.4.spark.serializerThe parameter specifies the serializer initialization
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${
serializer.getClass}")
1.2.3.5.serializerManager Serialization manager initialization
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
1.2.3.6.javaSerializer initialization
val closureSerializer = new JavaSerializer(conf)
1.2.3.7.broadcastManager Broadcast manager initialization
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
1.2.3.8.mapOutputTracker MapTask output tracker initialization
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}
// 分配节点
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
1.2.3.9.shuffleManager初始化
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
1.2.3.10.useLegacyMemoryManager Unified memory management initialization
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
1.2.3.11.blockManager初始化
val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
conf.get(BLOCK_MANAGER_PORT)
}
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
1.2.3.12.metricsSystem The indicator management system is initialized
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
1.2.3.13.outputCommitCoordinator Output commit coordinator initialization
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
1.2.3.14.driverCreate a temporary directory on the side
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
if (isDriver) {
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
envInstance.driverTmpDir = Some(sparkFilesDir)
}
2.参考资料
创建Spark执行环境SparkEnv
Spark源码分析18 - 调度系统03:OutputCommitCoordinator和MapOutputTracker
spark MetricsSystem 完全揭秘
Spark执行环境——安全管理器SecurityManager
Spark2.1.0——广播管理器BroadcastManager
Spark-shuffleRead the source code carefully:从ShuffleManager开始
Spark原理分析–Implementation of unified memory management
边栏推荐
- spark源码-任务提交流程之-1-sparkSubmit
- 每日一题-最长回文子串-0714
- Getting Started 03 Distinguish between development and production environments ("hot update" is performed only in the production environment)
- 什么是阿里云·速成美站?
- [Day1] (Super detailed steps) Build a soft RAID disk array
- Getting Started Document 07 Staged Output
- Apache配置反向代理
- Account and Permission Management
- 什么?CDN缓存加速只适用于加速静态内容?
- spark算子-coalesce算子
猜你喜欢
每日一题-两数相加-0711
Wireshark抓包及常用过滤方法
Why can't I add a new hard disk to scan?How to solve?
spark算子-map vs mapPartitions算子
什么?CDN缓存加速只适用于加速静态内容?
云游戏未来展望
Getting Started 05 Using cb() to indicate that the current task is complete
【Day8】 RAID磁盘阵列
[Paper Intensive Reading] Rich Feature Hierarchies for Accurate Object Detection and Semantic Segmentation (R-CNN)
你要找的cocos面试答案都在这里了!
随机推荐
乘云科技受邀出席2022阿里云合作伙伴大会荣获“聚力行远奖”
Getting Started 05 Using cb() to indicate that the current task is complete
海外服务器的优势
【UiPath2022+C#】UiPath数据类型
图片压缩失效问题
【Day8】RAID Disk Array
dsf5.0 弹框点确定没有返回值的问题
IJCAI 2022|Boundary-Guided Camouflage Object Detection Model BGNet
Wireshark抓包及常用过滤方法
【机器学习】1单变量线性回归
【Day8】使用LVM扩容所涉及的命令
【Machine Learning】1 Univariate Linear Regression
来来来,一文让你读懂Cocos Creator如何读写JSON文件
spark源码-任务提交流程之-1-sparkSubmit
如何用UE5渲染一个可爱的茶壶屋?
入门文档05-2 使用return指示当前任务已完成
[Day8] Commands involved in using LVM to expand
dsf5.0新建页面访问时重定向到首页的问题
Getting Started Doc 06 Adding files to a stream
什么?CDN缓存加速只适用于加速静态内容?