当前位置:网站首页>Spark动态资源分配的资源释放过程及BlockManager清理过程
Spark动态资源分配的资源释放过程及BlockManager清理过程
2022-07-27 14:23:00 【wankunde】
Spark动态资源分配过程中YarnScheduler 释放资源过程
SchedulerBackend, TaskScheduler 和 ExecutorAllocationManager 的创建
val (_schedulerBackend, _taskScheduler) = SparkContext.createTaskScheduler( sc: SparkContext, master: String, deployMode: String): (SchedulerBackend, TaskScheduler)– 根据给定的master URL创建一个task scheduler
通过ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))服务加载所有注册的ExternalClusterManager,参考 META-INF/services 目录下的org.apache.spark.scheduler.ExternalClusterManager文件,这里注册了YarnClusterManager,且通过canCreate(url) 方法返回唯一的Yarn调度器。
org.apache.spark.scheduler.ExternalClusterManager
org.apache.spark.scheduler.cluster.YarnClusterManager
通过 YarnClusterManager 生成的_schedulerBackend = YarnClusterSchedulerBackend, scheduler = YarnScheduler。YarnClusterSchedulerBackend 和 YarnScheduler类继承关系如下:
YarnClientSchedulerBackend
YarnSchedulerBackend : Yarn的资源管理事件都在这里, RegisterClusterManager, RemoveExecutor, RequestExecutors, KillExecutors等
CoarseGrainedSchedulerBackend
ExecutorAllocationClient
YarnScheduler
TaskSchedulerImpl
TaskScheduler
SparkContext 在初始化的时候如果开启动态资源分配,会实例化一个 ExecutorAllocationManager() 并start。在ExecutorAllocationManager内部会调用上面的YarnClusterSchedulerBackend(就是后面的client)来进行实际的调度。
_executorAllocationManager = Some(new ExecutorAllocationManager(schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, cleaner = cleaner))
ExecutorAllocationManager
ExecutorAllocationManager 服务起来后会启动一个后台线程循环调度,executorMonitor 会把超时的Executor list去取出来,并调用 removeExecutors()进行executor资源释放。
def start(): Unit
def schedule(): Unit
val executorIdsToBeRemoved = executorMonitor.timedOutExecutors()
def removeExecutors(executors: Seq[String])
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false, countFailures = false, force = false)
上面调用client: ExecutorAllocationClient的killExecutors方法。client实际上就是我们之前看到的class CoarseGrainedSchedulerBackend extends ExecutorAllocationClient
CoarseGrainedSchedulerBackend
def killExecutors()
doKillExecutors(executorsToKill)
YarnSchedulerBackend:
def doKillExecutors(executorIds: Seq[String])
yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))
o.a.s.scheduler.cluster.YarnSchedulerEndpoint
YarnSchedulerEndpoint 直接将 KillExecutors 的请求转发给 AMEndpoint var amEndpoint: Option[RpcEndpointRef]
o.a.s.deploy.yarn.ApplicationMaster
ApplicationMaster 开始处理 KillExecutors 事件
case KillExecutors(executorIds)
YarnAllocator:
def killExecutor(executorId: String)
internalReleaseContainer(container)
amClient.releaseAssignedContainer(container.getId()) -- 主动向RM申请释放资源
BlockManager 清理Broadcast过程
BlockManager 管理的相关Relaition
SparkEnv
- BlockManager
- blockManagerMaster : 内部有两个EndpointRef,分别用于处理BlockManagerInfo 的 RPC事件和心跳事件, 管理 BlockManagerInfo 列表.
- driverEndpoint: BlockManagerMasterEndpoint : 为了管理所有BlockManagerInfo 内部维护了一个 blockManagerInfo: Map[BlockManagerId, BlockManagerInfo]
- BlockManagerId 可以理解为由 executorId, host, port, topologyInfo 四个字段组成的一个标识符
- BlockManagerInfo : 在Driver和Executor上都有,内部维护当前有的 blocks: JHashMap[BlockId, BlockStatus]信息,用于内存管理。当removeBlock时,会做标记删除,但是使用的进程内存 _remainingMem 不释放。
- driverHeartbeatEndPoint: BlockManagerMasterHeartbeatEndpoint 日常心跳管理,忽略
val DRIVER_ENDPOINT_NAME = "BlockManagerMaster"
val DRIVER_HEARTBEAT_ENDPOINT_NAME = "BlockManagerMasterHeartbeat"
## Executor 端BlockManager的初始化和注册
BlockManagerInfo 在 Executor 实例化的时候通过发送 `RegisterBlockManager` 事件到 Driver Endpoint 进行注册
```java
CoarseGrainedExecutorBackend
EVENT: case RegisteredExecutor =>
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false, resources = _resources)
Executor() 构造方法
env.blockManager.initialize(conf.getAppId)
val id = BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
val idFromMaster = master.registerBlockManager(id, ...* )
val updatedId = driverEndpoint.askSync[BlockManagerId](RegisterBlockManager(id, localDirs, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)) -- 向 driverEndpoint 发送注册事件
RemoveBroadcast 管理
当我们需要对集群Block进行管理的时候,只需要调用BlockManager中的master引用即可。例如:SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
BlockManagerMaster:
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean): Unit -- master对象只是接口,将实际请求转给 driverEndpoint
val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveBroadcast(broadcastId, removeFromMaster))
driverEndpoint: BlockManagerMasterEndpoint
def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean)
-- 开始构造 新的RemoveBroadcast 事件,由Driver发送到各个Executor上的BlockManager
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
requiredBlockManagers.map {
bm => bm.slaveEndpoint.ask[Int](removeMsg) }
实际代码举例
val bcNewCenters = data.context.broadcast(newCenters) -- 创建broadcast对象
bcNewCenters.unpersist() --调用unpersist() 方法
Broadcast.scala
def unpersist(): Unit
def unpersist(blocking: Boolean): Unit
TorrentBroadcast.scala : def doUnpersist(blocking: Boolean): Unit
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
边栏推荐
- EMC design scheme of CAN bus
- 3D相关的简单数学知识
- Google team launches new transformer to optimize panoramic segmentation scheme CVPR 2022
- Unity mouse controls the first person camera perspective
- Huayun data creates a perfect information technology and innovation talent training system to help the high-quality development of information technology and innovation industry
- Unity性能优化------DrawCall
- STM32学习之CAN控制器简介
- How to take satisfactory photos / videos from hololens
- JUC(JMM、Volatile)
- Selenium 报错:session not created: This version of ChromeDriver only supports Chrome version 81
猜你喜欢

基于FIFO IDT7202-12的数字存储示波器

RS485接口的EMC设计方案

3.3-5v转换

Dan bin Investment Summit: on the importance of asset management!

EMC design scheme of CAN bus

Introduction of the connecting circuit between ad7606 and stm32

Method of removing top navigation bar in Huawei Hongmeng simulator

Leetcode 240. search two-dimensional matrix II medium

What is the breakthrough point of digital transformation in the electronic manufacturing industry? Lean manufacturing is the key

Four kinds of relay schemes driven by single chip microcomputer
随机推荐
Network equipment hard core technology insider router Chapter 4 Jia Baoyu sleepwalking in Taixu Fantasy (Part 2)
How to take satisfactory photos / videos from hololens
LeetCode 74. 搜索二维矩阵 二分/medium
Leetcode interview question 17.21. water volume double pointer of histogram, monotonic stack /hard
仪表放大器和运算放大器优缺点对比
Network equipment hard core technology insider router Chapter 16 dpdk and its prequel (I)
Network equipment hard core technology insider router Chapter 7 tompkinson roaming the network world (Part 2)
The design method of integral operation circuit is introduced in detail
华为鸿蒙模拟器去除顶部导航栏方法
Problem solving in magic tower project
Design scheme of digital oscilloscope based on stm32
适配验证新职业来了!华云数据参与国家《信息系统适配验证师国家职业技能标准》编制
Dialog manager Chapter 3: create controls
TCC
Transactions_ Basic demonstrations and transactions_ Default auto submit & manual submit
初探STM32掉电复位PDR
STL value string learning
Network equipment hard core technology insider router Chapter 21 reconfigurable router
Data warehouse project is never a technical project
Network equipment hard core technology insider router 20 dpdk (V)