当前位置:网站首页>Analysis of spark task scheduling exceptions
Analysis of spark task scheduling exceptions
2022-07-27 15:37:00 【wankunde】
List of articles
Spark Task scheduling exception
Recently, I'm helping my business classmates run Spark Program , One of them job Of Stage Yes 15000+Task, from 100 Multiple executor To perform . After the task is executed for a period of time ,driver Only a few were allocated task Give some of them Executor perform , Most of the others executors It's all idle .
review Spark Task Scheduling process
There are many articles on the Internet , No more repetition . Briefly describe some of these processes :
- Spark Yes Job Cut and divide Stage, Every Stage And then cut it into TaskSet, from TaskSetManager Conduct management , Internal will be based on the task preferredLocation Make a difference Locality Level task list to be scheduled .
- CoarseGrainedSchedulerBackend Every second will trigger all the currently registered Executors(offers) Schedule tasks , At the same time, if there is Executor When a change occurs, it will trigger the Executor Task scheduling
- adopt resourceOffer() Function on these offers Step by step to the above Pending Schedule task Allocate and schedule
For example, we are going to implement Stage There are two Task(0 and 1) Corresponding TaskSetManager The task pending The queue is as follows :
// register pendingTasks
2020-11-03 17:13:22,279 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-006.com) = Some(ArrayBuffer(1))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-004.com) = Some(ArrayBuffer(1))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-005.com) = Some(ArrayBuffer(1))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask ALL = ArrayBuffer(1)
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-003.com) = Some(ArrayBuffer(0))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-004.com) = Some(ArrayBuffer(1, 0))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask HOST(offline-005.com) = Some(ArrayBuffer(1, 0))
2020-11-03 17:13:22,280 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask ALL = ArrayBuffer(1, 0)
2020-11-03 17:13:22,336 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0))
2020-11-03 17:13:22,336 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0, 1, 0))
2020-11-03 17:13:22,336 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0, 1, 0, 0))
2020-11-03 17:13:22,336 INFO org.apache.spark.scheduler.TaskSetManager: addPendingTask RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0, 1, 0, 0, 1))
// TaskSet After generation, apply to the cluster for resources to execute Job
2020-11-03 17:13:22,341 INFO org.apache.spark.scheduler.FairSchedulableBuilder: Added task set TaskSet_0.0 tasks to pool default
2020-11-03 17:13:23,224 INFO org.apache.spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1)
2020-11-03 17:13:24,231 INFO org.apache.spark.ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2)
2020-11-03 17:13:25,635 INFO org.apache.spark.resource.ResourceProfile: Default ResourceProfile created, executor resources: Map(memoryOverhead -> name: memoryOverhead, amount: 1024, script: , vendor: , cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 2048, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
2020-11-03 17:13:26,194 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.3.162:32864) with ID 1
2020-11-03 17:13:26,200 INFO org.apache.spark.scheduler.dynalloc.ExecutorMonitor: New executor 1 has registered (new total is 1)
2020-11-03 17:13:26,284 INFO org.apache.spark.storage.BlockManagerMasterEndpoint: Registering block manager offline-007.com:45459 with 912.3 MiB RAM, BlockManagerId(1, offline-007.com, 45459, None)
2020-11-03 17:13:26,356 INFO org.apache.spark.scheduler.cluster.YarnScheduler: receive offers: WorkerOffer(1,offline-007.com,1,Some(192.168.3.162:32864),Map())
// For leisure WorkerOffer Schedule tasks
2020-11-03 17:13:26,401 INFO org.apache.spark.scheduler.cluster.YarnScheduler: resourceOfferSingleTaskSet: offers = Vector(WorkerOffer(1,offline-007.com,1,Some(192.168.3.162:32864),Map())), tasks = Vector(ArrayBuffer()), maxLocality = RACK_LOCAL
2020-11-03 17:13:26,403 INFO org.apache.spark.scheduler.TaskSetManager: current allowed locality: RACK_LOCAL
// executor 1 Of pending The queue is empty
2020-11-03 17:13:26,403 INFO org.apache.spark.scheduler.TaskSetManager: TaskLocality.PROCESS_LOCAL(1) = None
// offline-007.com Mechanical pending The queue is empty
2020-11-03 17:13:26,404 INFO org.apache.spark.scheduler.TaskSetManager: TaskLocality.NODE_LOCAL(offline-007.com) = None
2020-11-03 17:13:26,405 INFO org.apache.spark.scheduler.TaskSetManager: TaskLocality.NO_PREF = ArrayBuffer()
// /default Of the rack pending Queue has tasks , Select the last task (task index=1) Start execution
2020-11-03 17:13:26,406 INFO org.apache.spark.scheduler.TaskSetManager: TaskLocality.RACK_LOCAL(/default) = Some(ArrayBuffer(1, 0, 1, 0, 0, 1))
2020-11-03 17:13:26,418 INFO org.apache.spark.scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 0, offline-007.com, executor 1, partition 1, RACK_LOCAL, 7814 bytes)
// Task scheduling succeeded , List of remaining tasks to be scheduled
2020-11-03 17:13:26,426 INFO org.apache.spark.scheduler.cluster.YarnScheduler: begin launchTask TaskDescription(TID=0, index=1), pending tasks:
forExecutor = Map()
forHost = Map(offline-005.com -> ArrayBuffer(1, 0), offline-004.com -> ArrayBuffer(1, 0), offline-003.com -> ArrayBuffer(0), offline-006.com -> ArrayBuffer(1))
noPrefs = ArrayBuffer()
forRack = Map(/default -> ArrayBuffer(1, 0, 1, 0, 0))
all = ArrayBuffer(1, 0)
Problem analysis
Analyze the task scheduling log , Under normal circumstances , The task should be to Executor Level -> Node Level -> Rack Level -> Any Level to schedule tasks in turn . But in fact, the mission has been maintained NODE_LOCAL Level to perform tasks , When normal maxLocality Level turn Rack When level , In effect locality still NODE_LOCAL, This leads to a lot Executor The machine corresponds to its own node Task After the list is executed , Cannot execute on other nodes PendingTask
Here comes the core of the problem , Why is the scheduling level of the task not upgraded ? The calculation function of this logic is getAllowedLocalityLevel() in .
Core code and logical relationship
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = //...
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = // ...
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasks.forExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasks.forHost)
case TaskLocality.NO_PREF => pendingTasks.noPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasks.forRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}

Infer the phenomenon from the result , If the task scheduling level remains the current scheduling level , namely NODE_LOCAL There are still tasks to be scheduled at level ( Verification is OK ) And the current time is not satisfied Locality Wait Time ( The default is 3 second ). That is to say 3 In seconds , The system has FINISHED Task, be lastLaunchTime Keep updating , As a result, I have been unable to meet NODE_LOCAL Upgrade to RACK_LOCAL Level scheduling .
Through the log , Verification logic .
Confirm that the program is in 3 The task has been successfully executed within seconds , So the whole thing Job The scheduling level of has always been NODE_LOCAL Level .
2020-11-03 21:01:47,950 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14450, state=FINISHED
2020-11-03 21:01:48,338 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=2, taskId=14449, state=FINISHED
2020-11-03 21:01:48,630 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=1, taskId=14451, state=FINISHED
2020-11-03 21:01:49,484 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14452, state=FINISHED
2020-11-03 21:01:50,424 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=1, taskId=14454, state=FINISHED
2020-11-03 21:01:50,501 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=2, taskId=14453, state=FINISHED
2020-11-03 21:01:50,896 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14455, state=FINISHED
2020-11-03 21:01:52,401 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14457, state=FINISHED
2020-11-03 21:01:52,543 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=1, taskId=14456, state=FINISHED
2020-11-03 21:01:53,765 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14458, state=FINISHED
2020-11-03 21:01:55,402 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14459, state=FINISHED
2020-11-03 21:01:56,703 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14460, state=FINISHED
2020-11-03 21:01:57,902 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14461, state=FINISHED
2020-11-03 21:01:59,141 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14462, state=FINISHED
2020-11-03 21:02:00,369 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14463, state=FINISHED
2020-11-03 21:02:01,313 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14464, state=FINISHED
2020-11-03 21:02:02,408 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14465, state=FINISHED
2020-11-03 21:02:04,612 INFO org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint: RPC for StatusUpdate: executorId=178, taskId=14466, state=FINISHED
Task optimization ideas
My idea is to retain the whole scheduling level verification logic , The advantage is that first, when resources are sufficient , The task is achievable Locality Level scheduled ; second , There is another one in this logic lazy abnormal task pending The logic of the queue , It's better to keep this .
So I will finish every task , No more updates taskSet Of lastLaunchTime, It is used to ensure the upgrade of task scheduling level .
The actual implementation effect is in line with expectations , The overall scheduling is up , At the same time, it did as much as possible DataLocality Guarantee .
边栏推荐
- Network equipment hard core technology insider router Chapter 7 tompkinson roaming the network world (Part 2)
- 使用Lombok导致打印的tostring中缺少父类的属性
- multimap案例
- 初识结构体
- 【剑指offer】面试题53-Ⅱ:0~n-1中缺失的数字——二分查找
- 后台返回来的是这种数据,是什么格式啊
- JUC(JMM、Volatile)
- Network equipment hard core technology insider router Chapter 9 Cisco asr9900 disassembly (II)
- C语言:三子棋游戏
- Use double stars instead of math.pow()
猜你喜欢

QT (IV) mixed development using code and UI files

C语言:三子棋游戏

Troubleshooting the slow startup of spark local programs

复杂度分析

Tools - common methods of markdown editor

Jump to the specified position when video continues playing

Spark 3.0 DPP实现逻辑

With just two modifications, apple gave styleganv2 3D generation capabilities

How to package AssetBundle

【剑指offer】面试题45:把数组排成最小的数
随机推荐
Read the wheelevent in one article
Overview of wechat public platform development
Network equipment hard core technology insider router 20 dpdk (V)
设置提示框位置随鼠标移动,并解决提示框显示不全的问题
Network equipment hard core technology insider router Chapter 18 dpdk and its prequel (III)
multimap案例
Complexity analysis
Photoelectric isolation circuit design scheme (six photoelectric isolation circuit diagrams based on optocoupler and ad210an)
Several basic uses of tl431-2.5v voltage reference chip
扩展Log4j支持日志文件根据时间分割文件和过期文件自动删除功能
QT (XIII) qchart drawing line chart
shell脚本读取文本中的redis命令批量插入redis
Network equipment hard core technology insider router Chapter 5 tompkinson roaming the network world (Part 1)
[daily question 1] 558. Intersection of quadtrees
使用解构交换两个变量的值
Spark TroubleShooting整理
Alibaba's latest summary 2022 big factory interview real questions + comprehensive coverage of core knowledge points + detailed answers
[正则表达式] 单个字符匹配
Implement custom spark optimization rules
MLX90640 红外热成像仪测温传感器模块开发笔记(七)