当前位置:网站首页>Spark源码-任务提交流程之-6.2-sparkContext初始化-TaskScheduler任务调度器
Spark源码-任务提交流程之-6.2-sparkContext初始化-TaskScheduler任务调度器
2022-08-05 05:18:00 【zdaiqing】
TaskScheduler
1.入口
TaskScheduler任务调度器的初始化在sparkContext的初始化过程中完成;
具体入口位置如下代码段所示:在sparkContext类的try catch代码块中;
调用链:try catch块 ->createTaskScheduler方法;
在这点代码中,完成了taskScheduler、schedulerBackend、dagScheduler的初始化,最后调用taskScheduler.start方法启动taskScheduler;
class SparkContext(config: SparkConf) extends Logging {
//无关代码省略...
// Create and start the scheduler
try{
//实例化taskScheduler、schedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
//实例化dagScheduler
_dagScheduler = new DAGScheduler(this)
//将taskScheduler实例完成事件通知heartbeatReceiver
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
//启动taskScheduler
_taskScheduler.start()
//无关代码省略...
}catch{
}
}
2.createTaskScheduler
2.1.参数
sc: SparkContext
master:
配置文件中spark.master参数指定;
要连接的集群管理器;
传递给 Spark 的master URL:
local、local[K]、local[K,F]、local[*]、local[*,F]、local-cluster[N,C,M]、spark://HOST:PORT、spark://HOST1:PORT1,HOST2:PORT2、yarn
deployMode:
配置文件中spark.submit.deployMode参数指定,没有指定时默认client;
Spark驱动程序的部署模式,“客户端”或“集群”,这意味着在集群内部的一个节点上本地(“客户端”)或远程(“集群”)启动驱动程序。
master参数的值可参考文件【master URLS】
2.2.创建流程
1、根据不同部署模式(master参数指定),选择不同的实现进行实例化;
local模式:TaskSchedulerImpl、LocalSchedulerBackend;
local[N] 或local[*]模式:TaskSchedulerImpl、LocalSchedulerBackend;
并行度由指定参数N确定,如果参数为*,根据java虚拟机可可用cpu核数确定
local[K,F]模式:TaskSchedulerImpl、LocalSchedulerBackend;
并行度由指定参数N确定,如果参数为*,根据java虚拟机可可用cpu核数确定;
spark://(.*)模式:TaskSchedulerImpl、StandaloneSchedulerBackend;
local-cluster[N, cores, memory]模式:TaskSchedulerImpl、StandaloneSchedulerBackend;
其他情况(yarn、mesos://HOST:PORT、k8s://HOST:PORT等):利用集群管理器创建taskScheduler、schedulerBackend
2、调用taskScheduler的initialize方法,将backend的引用绑定到scheduler;根据调度策略创建调度池rootPool;
3、返回(backend, scheduler);
object SparkContext extends Logging {
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
import SparkMasterRegex._
// When running locally, don't try to re-execute tasks on failure.
//失败重试次数
val MAX_LOCAL_TASK_FAILURES = 1
//根据不同master(部署模式)匹配不同的taskScheduler和schedulerBackend实现
master match {
//local模式:TaskSchedulerImpl、LocalSchedulerBackend
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
//local[N] 或local[*]模式:TaskSchedulerImpl、LocalSchedulerBackend
//并行度由指定参数N确定,如果参数为*,根据java虚拟机可可用cpu核数确定
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
//local[K,F]模式:TaskSchedulerImpl、LocalSchedulerBackend
//并行度由指定参数N确定,如果参数为*,根据java虚拟机可可用cpu核数确定
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
//spark://(.*)模式:TaskSchedulerImpl、StandaloneSchedulerBackend
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
//可能存在多个HOST:PORT组合用逗号分割的情况
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
//local-cluster[N, cores, memory]模式:TaskSchedulerImpl、StandaloneSchedulerBackend
//通过回调函数关闭本地集群
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
//其他情况(yarn、mesos://HOST:PORT、k8s://HOST:PORT等):利用集群管理器创建taskScheduler、schedulerBackend
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
}
2.3.scheduler.initialize方法
initialize方法将backend的引用绑定到scheduler,根据调度策略创建调度池rootPool;
调度模式由spark.scheduler.mode参数指定,默认调度模式FIFO;
private[spark] class TaskSchedulerImpl(
//默认调度模式:FIFO
//可由spark.scheduler.mode参数指定
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
val schedulingMode: SchedulingMode =
try {
SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
} catch {
case e: java.util.NoSuchElementException =>
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
}
//默认调度池
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
def initialize(backend: SchedulerBackend) {
//将backend的引用绑定到scheduler
this.backend = backend
//根据不同调度策略创建调度池
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
}
3.实例化DAGScheduler
在实例化过程中,通过sparkContext对象,将taskScheduler、listenerBus、mapOutputTracker、blockManagerMaster、env对象的应用绑定到dagScheduler上;
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
this(
sc,
taskScheduler,
sc.listenerBus,
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
sc.env.blockManager.master,
sc.env)
}
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
}
4.taskScheduler.start
override def start() {
//调用schedulerBackend的start方法
backend.start()
//针对非本地模式,根据spark.speculation参数决定是否需要启动推测执行线程
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
5.参考资料
Spark 源码解读(四)SparkContext的初始化之创建任务调度器TaskScheduler
Spark 源码分析(三): SparkContext 初始化之 TaskScheduler 创建与启动
【Spark内核源码】SparkContext一些方法的解读
边栏推荐
猜你喜欢
随机推荐
什么是阿里云·速成美站?
LeetCode刷题之第74题
多边形等分
【ts】typescript高阶:键值类型及type与interface区别
D39_坐标转换
【3D模型教程】ZBrush如何表现皮肤纹理?
电子产品量产工具(5)- 页面系统实现
(oj)原地移除数组中所有的元素val、删除排序数组中的重复项、合并两个有序数组
Cocos Creator开发中的事件响应
每日一题-删除链表的倒数第 N 个结点-0718
每日一题-两数相加-0711
【ts】typeScript高阶:any和unknown
LeetCode刷题之第33题
Unity3D中的ref、out、Params三种参数的使用
每日一题-最长回文子串-0714
LeetCode刷题之第24题
每日一题-单调栈
LeetCode刷题之第23题
2020,Laya最新中高级面试灵魂32问,你都知道吗?
D45_摄像机组件Camera