当前位置:网站首页>Detailed explanation of spark specification
Detailed explanation of spark specification
2022-06-25 11:21:00 【pyiran】
Why speculation
We all know ,Spark job in , One stage When to finish , Depending on stage Next last task The completion time of .task The completion time of is also affected by many factors , such as partition The distribution of ,executor Resource usage of ,host Operating state , Cluster network, etc . In many cases, it is caused by the operating environment task Running too slowly , Give Way task Running again can alleviate this problem , therefore Spark It supports speculation( speculation ) function . In this article, we will introduce in detail what is spark Of speculation.
Spark.Speculation
stay spark Of configuration in , About speculation The parameters of are as follows :
| property name | default | meaning |
|---|---|---|
| spark.speculation | false | If set to "true", Would be right. tasks Execute the speculation mechanism . That is to say, in one stage Slow down tasks There will be a chance to be restarted |
| spark.speculation.interval | 100ms | Spark testing tasks The interval between speculative mechanisms |
| spark.speculation.multiplier | 1.5 | One task The runtime is all task Several times the median running time of ( Threshold value ) Will be considered as task Reboot required |
| spark.speculation.quantile | 0.75 | When one stage What percentage of tasks The speculation mechanism will not be enabled until the operation is completed |
We noticed that , The speculation mechanism is based on a stage Next on , Different stage Under the task Will not affect each other , It is also aimed at the running task. When speculation execution is started ,spark Will get the first completed task Results and will task Mark as done .
Speculation Workflow
from spark About speculation Configuration parameters for , It is not difficult for us to judge spark The speculative workflow of .

Spark Source code
TaskScheduler Start function of , stay sparkContext Called during initialization .
override def start() {
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
stay TaskSetManager in , Detection needs to be started speculation The mechanism task
/** * Check for tasks to be speculated and return true if there are any. This is called periodically * by the TaskScheduler. * */
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
// Can't speculate if we only have one task, and no need to speculate if the task set is a
// zombie.
if (isZombie || numTasks == 1) {
return false
}
var foundTasks = false
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
val time = clock.getTimeMillis()
val medianDuration = successfulTaskDurations.median
val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
// TODO: Threshold should also look at standard deviation of task durations and have a lower
// bound based on that.
logDebug("Task length threshold for speculation: " + threshold)
for (tid <- runningTasksSet) {
val info = taskInfos(tid)
val index = info.index
if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
logInfo(
"Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
.format(index, taskSet.id, info.host, threshold))
speculatableTasks += index
sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
foundTasks = true
}
}
}
foundTasks
}
DAGScheduler Chinese vs task Conduct re-launch, So this is taking advantage of event The mechanism goes on .
/** * Called by the TaskSetManager when it decides a speculative task is needed. */
def speculativeTaskSubmitted(task: Task[_]): Unit = {
eventProcessLoop.post(SpeculativeTaskSubmitted(task))
}
Reference resources
https://spark.apache.org/docs/latest/configuration.html
边栏推荐
- 3 Questions par jour (3) - vérifier l'existence d'entiers et de leurs doubles
- Multiple environment variables
- Subclass a inherits from parent class B, a a = new a(); Then the execution sequence of the constructor of parent class B, the static code block of parent class B, the non static code block of parent c
- What are the functions of arm64 assembly that need attention?
- Database Series: MySQL index optimization summary (comprehensive version)
- Vulnérabilité à l'injection SQL (contournement)
- [file inclusion vulnerability-04] classic interview question: how to getshell when a website is known to have only local file inclusion vulnerability?
- How PHP extracts image addresses from strings
- relu与sigmod的比较
- 基于超算平台气象预警并行计算架构研究
猜你喜欢

Shen Ying, China Academy of communications and communications: font open source protocol -- Introduction to ofl v1.1 and analysis of key points of compliance

SQL注入漏洞(类型篇)

JVM 原理简介

Jincang KFS data cascade scenario deployment

Geographic location system based on openstreetmap+postgis paper documents + reference papers + project source code and database files

Redis6 note02 configuration file, publish and subscribe, new data type, jedis operation

Dragon Book tiger Book whale Book gnawing? Try the monkey book with Douban score of 9.5

Vulnérabilité à l'injection SQL (contournement)

寿命分布 4种

SystemVerilog(十三)-枚举数据类型
随机推荐
Application of global route guard
牛客网:主持人调度
2022年PMP项目管理考试敏捷知识点(2)
Dragon Book tiger Book whale Book gnawing? Try the monkey book with Douban score of 9.5
今天16:00 | 中科院计算所研究员孙晓明老师带大家走进量子的世界
金仓数据库 KingbaseES 插件force_view
Open source invites you to participate in the openssf Open Source Security Online Seminar
Kingbasees plug-in ftutilx of Jincang database
How to start the phpstudy server
Arrays.asList()
Handler、Message、Looper、MessageQueue
Bayes
Comparison between relu and SIGMOD
Some assembly instructions specific to arm64
基于OpenStreetMap+PostGIS的地理位置系统 论文文档+参考论文文献+项目源码及数据库文件
Redis6 note02 configuration file, publish and subscribe, new data type, jedis operation
Free access to the global human settlements layer (ghsl) dataset from Gee
某APP中模拟器检测分析
Tidb applicable scenarios
How PHP extracts image addresses from strings