当前位置:网站首页>Spark source code-task submission process-6.2-sparkContext initialization-TaskScheduler task scheduler
Spark source code-task submission process-6.2-sparkContext initialization-TaskScheduler task scheduler
2022-08-05 06:12:00 【zdaiqing】
TaskScheduler
1.入口
TaskSchedulerThe initialization of the task scheduler is in sparkContext的初始化过程中完成;
具体入口位置如下代码段所示:在sparkContext类的try catch代码块中;
调用链:try catch块 ->createTaskScheduler方法;
in this code,完成了taskScheduler、schedulerBackend、dagScheduler的初始化,最后调用taskScheduler.start方法启动taskScheduler;
class SparkContext(config: SparkConf) extends Logging {
//无关代码省略...
// Create and start the scheduler
try{
//实例化taskScheduler、schedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
//实例化dagScheduler
_dagScheduler = new DAGScheduler(this)
//将taskSchedulerInstance completion event notificationheartbeatReceiver
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
//启动taskScheduler
_taskScheduler.start()
//无关代码省略...
}catch{
}
}
2.createTaskScheduler
2.1.参数
sc: SparkContext
master:
配置文件中spark.master参数指定;
The cluster manager to connect to;
传递给 Spark 的master URL:
local、local[K]、local[K,F]、local[*]、local[*,F]、local-cluster[N,C,M]、spark://HOST:PORT、spark://HOST1:PORT1,HOST2:PORT2、yarn
deployMode:
配置文件中spark.submit.deployMode参数指定,Default if not specifiedclient;
SparkThe deployment mode of the driver,“客户端”或“集群”,This means locally on a node inside the cluster(“客户端”)或远程(“集群”)启动驱动程序.
masterThe value of the parameter can refer to the file【master URLS】
2.2.创建流程
1、According to different deployment modes(master参数指定),Choose a different implementation to instantiate;
local模式:TaskSchedulerImpl、LocalSchedulerBackend;
local[N] 或local[*]模式:TaskSchedulerImpl、LocalSchedulerBackend;
The degree of parallelism is specified by the parameterN确定,如果参数为*,根据javaThe virtual machine is availablecpu核数确定
local[K,F]模式:TaskSchedulerImpl、LocalSchedulerBackend;
The degree of parallelism is specified by the parameterN确定,如果参数为*,根据javaThe virtual machine is availablecpu核数确定;
spark://(.*)模式:TaskSchedulerImpl、StandaloneSchedulerBackend;
local-cluster[N, cores, memory]模式:TaskSchedulerImpl、StandaloneSchedulerBackend;
其他情况(yarn、mesos://HOST:PORT、k8s://HOST:PORT等):Created using cluster managertaskScheduler、schedulerBackend
2、调用taskScheduler的initialize方法,将backend的引用绑定到scheduler;Create scheduling pools based on scheduling policiesrootPool;
3、返回(backend, scheduler);
object SparkContext extends Logging {
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
//失败重试次数
val MAX_LOCAL_TASK_FAILURES = 1
//根据不同master(部署模式)匹配不同的taskScheduler和schedulerBackend实现
master match {
//local模式:TaskSchedulerImpl、LocalSchedulerBackend
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
//local[N] 或local[*]模式:TaskSchedulerImpl、LocalSchedulerBackend
//The degree of parallelism is specified by the parameterN确定,如果参数为*,根据javaThe virtual machine is availablecpu核数确定
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
//local[K,F]模式:TaskSchedulerImpl、LocalSchedulerBackend
//The degree of parallelism is specified by the parameterN确定,如果参数为*,根据javaThe virtual machine is availablecpu核数确定
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
//spark://(.*)模式:TaskSchedulerImpl、StandaloneSchedulerBackend
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
//可能存在多个HOST:PORTCombinations are separated by commas
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
//local-cluster[N, cores, memory]模式:TaskSchedulerImpl、StandaloneSchedulerBackend
//Shut down the local cluster through the callback function
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
//其他情况(yarn、mesos://HOST:PORT、k8s://HOST:PORT等):Created using cluster managertaskScheduler、schedulerBackend
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
}
2.3.scheduler.initialize方法
initialize方法将backend的引用绑定到scheduler,Create scheduling pools based on scheduling policiesrootPool;
Scheduling mode byspark.scheduler.mode参数指定,Default scheduling modeFIFO;
private[spark] class TaskSchedulerImpl(
//Default scheduling mode:FIFO
//可由spark.scheduler.mode参数指定
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
val schedulingMode: SchedulingMode =
try {
SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
} catch {
case e: java.util.NoSuchElementException =>
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
}
//Default scheduling pool
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
def initialize(backend: SchedulerBackend) {
//将backend的引用绑定到scheduler
this.backend = backend
//Create scheduling pools based on different scheduling policies
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
}
3.实例化DAGScheduler
在实例化过程中,通过sparkContext对象,将taskScheduler、listenerBus、mapOutputTracker、blockManagerMaster、envThe application of the object is bound todagScheduler上;
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
this(
sc,
taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env)
}
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
}
4.taskScheduler.start
override def start() {
//调用schedulerBackend的start方法
backend.start()
//For non-native mode,根据spark.speculationThe parameter determines whether a speculative execution thread needs to be started
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
5.参考资料
Spark 源码解读(四)SparkContextThe initialization of the task scheduler is createdTaskScheduler
Spark 源码分析(三): SparkContext 初始化之 TaskScheduler 创建与启动
【Spark内核源码】SparkContextInterpretation of some methods
边栏推荐
- Getting Started Document 01 series in order
- 入门文档06 向流(stream)中添加文件
- Unity huatuo 革命性热更系列1.2 huatuo热更环境安装与示例项目
- spark算子-coalesce算子
- SSL证书提示过期或者无效,该怎么处理呢?
- spark source code - task submission process - 2-YarnClusterApplication
- [Day1] (Super detailed steps) Build a soft RAID disk array
- 洞察互联网大趋势,读完这篇文章你就彻底了解中文域名
- The problem of redirecting to the home page when visiting a new page in dsf5.0
- 图片压缩失效问题
猜你喜欢
技术分享杂七杂八技术
Autoware中安装Yolo3目标检测模块遇到的问题
[Day5] Soft and hard links File storage, deletion, directory management commands
阿里云视频点播
[Day8] Commands involved in using LVM to expand
Getting Started Documentation 12 webserve + Hot Updates
入门文档06 向流(stream)中添加文件
TensorFlow ObjecDetectionAPI在win10系统Anaconda3下的配置
Mongodb查询分析器解析
云计算——osi七层与TCP\IP协议
随机推荐
The problem of calling ds18b20 through a single bus
Getting Started 03 Distinguish between development and production environments ("hot update" is performed only in the production environment)
入门文档01 series按顺序执行
Introductory document 05-2 use return instructions the current task has been completed
【Day1】VMware软件安装
入门文档08 条件插件
markdown编辑器模板
TensorFlow ObjecDetectionAPI在win10系统Anaconda3下的配置
入门文档11 自动添加版本号
Unity3D中的ref、out、Params三种参数的使用
不吹不黑,这的确是我看过微服务架构最好的文章!
【Day8】磁盘及磁盘的分区有关知识
什么是全栈设计师?
Why can't I add a new hard disk to scan?How to solve?
错误类型:reflection.ReflectionException: Could not set property ‘xxx‘ of ‘class ‘xxx‘ with value ‘xxx‘
[Paper Intensive Reading] The relationship between Precision-Recall and ROC curves
spark算子-repartition算子
Getting Started Document 09 Standalone watch
Cocos Creator开发中的事件响应
Dsf5.0 bounced points determine not return a value