当前位置:网站首页>Spark 任务Task调度异常分析
Spark 任务Task调度异常分析
2022-07-27 14:23:00 【wankunde】
Spark任务调度异常
最近在帮业务的同学跑Spark程序,其中一个job的Stage有15000+Task,由100多个executor进行执行。任务执行一段时间后,driver只分配了几个task给其中的几个Executor执行,其他大部分executors全部处于空闲状态。
回顾Spark Task调度过程
这个网上文章很多,不再重复。简单说一下其中几个过程:
- Spark 对Job进行切分成Stage,每个Stage再切分成TaskSet,由TaskSetManager进行管理,内部会根据任务的preferredLocation生成不同Locality级别的待调度任务列表。
- CoarseGrainedSchedulerBackend 每一秒会触发一下对当前注册的所有的Executors(offers)进行任务调度,同时如果有Executor发生变化时会触发对该Executor的任务调度
- 通过resourceOffer() 函数对这些offers逐级的对上述Pending Schedule task进行分配调度
例如当前我们要执行的Stage有两个Task(0和1)对应的TaskSetManager 中的任务pending队列如下:
// 注册pendingTasks
2020-11-03 17:13:22,279 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-006.com) = Some(ArrayBuffer(1))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-004.com) = Some(ArrayBuffer(1))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-005.com) = Some(ArrayBuffer(1))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask ALL = ArrayBuffer(1)
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-003.com) = Some(ArrayBuffer(0))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-004.com) = Some(ArrayBuffer(1, 0))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-005.com) = Some(ArrayBuffer(1, 0))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask ALL = ArrayBuffer(1, 0)
2020-11-03 17:13:22,336 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0))
2020-11-03 17:13:22,336 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0, 1, 0))
2020-11-03 17:13:22,336 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0, 1, 0, 0))
2020-11-03 17:13:22,336 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0, 1, 0, 0, 1))
// TaskSet 生成后向集群申请资源来执行Job
2020-11-03 17:13:22,341 INFO org.apache.spark.scheduler.FairSchedulableBuilder: Added task set TaskSet_0.0 tasks to pool default
2020-11-03 17:13:23,224 INFO org.apache.spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1)
2020-11-03 17:13:24,231 INFO org.apache.spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2)
2020-11-03 17:13:25,635 INFO org.apache.spark.resource.ResourceProfile: Default ResourceProfile created, executor resources: Map(memoryOverhead -> name: memoryOverhead, amount: 1024, script: , vendor: , cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 2048, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
2020-11-03 17:13:26,194 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.3.162:32864) with ID 1
2020-11-03 17:13:26,200 INFO org.apache.spark.scheduler.dynalloc.ExecutorMonitor: New executor 1 has registered (new total is 1)
2020-11-03 17:13:26,284 INFO org.apache.spark.storage.BlockManagerMasterEndpoint: Registering block manager offline-007.com:45459 with 912.3 MiB RAM, BlockManagerId(1, offline-007.com, 45459, None)
2020-11-03 17:13:26,356 INFO org.apache.spark.scheduler.cluster.YarnScheduler: receive offers: WorkerOffer(1,offline-007.com,1,Some(192.168.3.162:32864),Map())
// 对空闲WorkerOffer进行任务调度
2020-11-03 17:13:26,401 INFO org.apache.spark.scheduler.cluster.YarnScheduler: resourceOfferSingleTaskSet: offers = Vector(WorkerOffer(1,offline-007.com,1,Some(192.168.3.162:32864),Map())), tasks = Vector(ArrayBuffer()), maxLocality = RACK_LOCAL
2020-11-03 17:13:26,403 INFO org.apache.spark.scheduler.TaskSetManager: current allowed locality: RACK_LOCAL
// executor 1 的pending队列为空
2020-11-03 17:13:26,403 INFO org.apache.spark.scheduler.TaskSetManager: TaskLocality.PROCESS_LOCAL(1) = None
// offline-007.com机器的pending队列为空
2020-11-03 17:13:26,404 INFO org.apache.spark.scheduler.TaskSetManager: TaskLocality.NODE_LOCAL(offline-007.com) = None
2020-11-03 17:13:26,405 INFO org.apache.spark.scheduler.TaskSetManager: TaskLocality.NO_PREF = ArrayBuffer()
// /default机架的pending队列有任务,选取最后一个任务(task index=1)开始执行
2020-11-03 17:13:26,406 INFO org.apache.spark.scheduler.TaskSetManager: TaskLocality.RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0, 1, 0, 0, 1))
2020-11-03 17:13:26,418 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 0, offline-007.com, executor 1, partition 1, RACK_LOCAL, 7814 bytes)
// 任务调度成功,剩余待调度任务列表
2020-11-03 17:13:26,426 INFO org.apache.spark.scheduler.cluster.YarnScheduler: begin launchTask TaskDescription(TID=0, index=1), pending tasks:
forExecutor = Map()
forHost = Map(offline-005.com -> ArrayBuffer(1, 0), offline-004.com -> ArrayBuffer(1, 0), offline-003.com -> ArrayBuffer(0), offline-006.com -> ArrayBuffer(1))
noPrefs = ArrayBuffer()
forRack = Map(/default -> ArrayBuffer(1, 0, 1, 0, 0))
all = ArrayBuffer(1, 0)
问题分析
分析任务调度日志,正常情况下,任务应该是在Executor级别 -> Node 级别 -> Rack级别 -> Any级别依次进行任务调度。但是实际上任务一直维持在NODE_LOCAL级别进行任务,当正常maxLocality级别轮到Rack级别时,实际生效的locality还是NODE_LOCAL, 这样就导致了很多Executor所在的机器把自身节点对应的Task列表执行完毕后,无法去执行其他节点上的PendingTask
问题核心点来了,为什么任务的调度级别不升级呢?这个逻辑的计算函数在 getAllowedLocalityLevel() 中。
核心代码及逻辑关系
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = //...
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = // ...
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}

从结果反推现象,如果任务调度级别一直维持当前调度级别,即NODE_LOCAL级别还有剩余待调度任务(验证没问题)和当前时间不满足Locality Wait 时间(默认为3秒)。即在3秒时间内,系统有FINISHED Task,则lastLaunchTime 一直在更新,导致一直不能满足NODE_LOCAL 升级到 RACK_LOCAL 级别的调度。
通过日志,验证逻辑。
确认果然是程序在3秒内一直有任务执行成功,所以整个Job的调度级别一直是 NODE_LOCAL 级别。
2020-11-03 21:01:47,950 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14450, state=FINISHED
2020-11-03 21:01:48,338 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=2, taskId=14449, state=FINISHED
2020-11-03 21:01:48,630 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=1, taskId=14451, state=FINISHED
2020-11-03 21:01:49,484 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14452, state=FINISHED
2020-11-03 21:01:50,424 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=1, taskId=14454, state=FINISHED
2020-11-03 21:01:50,501 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=2, taskId=14453, state=FINISHED
2020-11-03 21:01:50,896 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14455, state=FINISHED
2020-11-03 21:01:52,401 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14457, state=FINISHED
2020-11-03 21:01:52,543 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=1, taskId=14456, state=FINISHED
2020-11-03 21:01:53,765 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14458, state=FINISHED
2020-11-03 21:01:55,402 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14459, state=FINISHED
2020-11-03 21:01:56,703 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14460, state=FINISHED
2020-11-03 21:01:57,902 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14461, state=FINISHED
2020-11-03 21:01:59,141 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14462, state=FINISHED
2020-11-03 21:02:00,369 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14463, state=FINISHED
2020-11-03 21:02:01,313 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14464, state=FINISHED
2020-11-03 21:02:02,408 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14465, state=FINISHED
2020-11-03 21:02:04,612 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14466, state=FINISHED
任务优化思路
我的想法是第一要保留整个调度级别验证逻辑,好处是第一在资源充足时,任务是可以做到Locality级别调度的;第二,这个逻辑中还有一个lazy异常task pending队列的逻辑,这个还是保留比较好。
所以我将每次任务执行完毕后,不再更新taskSet的lastLaunchTime, 用于保证任务调度级别的升级。
实际执行效果符合预期,整体的调度上去了,同时又尽可能的做到了DataLocality的保证。
边栏推荐
猜你喜欢

Do you really understand CMS garbage collector?

reflex

The design method of integral operation circuit is introduced in detail

基于FIFO IDT7202-12的数字存储示波器
Comparison of advantages and disadvantages between instrument amplifier and operational amplifier

光电隔离电路设计方案(六款基于光耦、AD210AN的光电隔离电路图)

After configuring corswebfilter in grain mall, an error is reported: resource sharing error:multiplealloworiginvalues

Leetcode 240. search two-dimensional matrix II medium

Digital storage oscilloscope based on FIFO idt7202-12

JMeter recording interface automation
随机推荐
Leetcode-1737-满足三条件之一需改变的最少字符数
Spark Bucket Table Join
Sword finger offer merges two sorted linked lists
Multi table query_ Sub query overview and multi table query_ Sub query situation 1 & situation 2 & situation 3
分布式锁
The reverse order pairs in the "sword finger offer" array
Spark lazy list files 的实现
初探STM32掉电复位PDR
Selenium 报错:session not created: This version of ChromeDriver only supports Chrome version 81
Network equipment hard core technology insider router Chapter 11 Cisco asr9900 disassembly (V)
cap理论和base理论
STM32F10x_ Hardware I2C read / write EEPROM (standard peripheral library version)
/dev/loop1占用100%问题
Leetcode 90. subset II backtracking /medium
Huayun data creates a perfect information technology and innovation talent training system to help the high-quality development of information technology and innovation industry
两阶段提交与三阶段提交
Unity3d learning note 10 - texture array
Leetcode 190. reverse binary bit operation /easy
ADB command (install APK package format: ADB install APK address package name on the computer)
Unity mouse controls the first person camera perspective