当前位置:网站首页>【源码解读】| LiveListenerBus源码解读
【源码解读】| LiveListenerBus源码解读
2022-07-07 15:34:00 【857技术社区】
导读
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
* 异步将SparkListenerEvents传递给已注册的SparkListeners。
*
* Until `start()` is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
* 在调用“ start()”之前,所有已发布的事件仅被缓冲。
* 仅在此侦听器总线启动之后,事件才会实际传播到所有连接的侦听器。
* 当调用`stop()`时,此侦听器总线停止,并且停止后它将丢弃其他事件。
*/
为什么要使用事件监听机制? 设想如果Spark事件通知采用Scala函数调用方式,随着集群规模的增加,会对函数调用的越来越多,最终会受到JVM线程数量的限制而影响监控数据的更新,甚至出现无法提供监控数据给用户。函数调用多数情况是同步调用,这样还会导致线程阻塞,并被长时间占用。 使用事件监听机制的好处是什么? 会将函数调用更换成事件发送或者事件投递,事件的处理是异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,整个系统的并发将会大大的增加。发送的事件会进入缓存,由定时调度取出,分配给监听此事件的监听器对监控数据更新。
队列
异步事件队列
异步事件列队主要由LinkedBlockingQueue[SparkListenerEvent]
构建,默认大小为10000
事件监听线程会不断从LinkedBlockingQueue
中获取事件。任何事件都会在LinkedBlockingQueue
中存放一段时间,当线程处理完这个事件后,会将其清除。
// LiveListenerBus.scala
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
// AsyncEventQueue.scala
// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) //默认值10000
监听器队列
/** Add a listener to queue shared by all non-internal listeners. */
/**
* 主要由SparkContext调用,即用户可以在代码中增加Listener,
* 或从配单中增加Listener并反射调用[实现在SparkContext中的setupAndStartListenerBus()]
* */
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, SHARED_QUEUE)
}
/** Add a listener to the executor management queue. */
/**
* 分别可增加HeartbeatReceiver(用于监听Executor的Add和Remove,并使用线程定期判断各Executor的心跳时间,超时则Kill
* Executor),另外可通过ExecutorAllocationManager增加ExecutorAllocationListener
* (通过计算总task数和Excutor并行度进行匹配,动态增加、减少Executor,需要配置,默认关闭)
* */
def addToManagementQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
}
/** Add a listener to the application status queue. */
/**
* 主要增加了AppStatusListener,为AppStatusStore提供Job、Stage、Task的UI展示数据,
* 以及增加了SQLAppStatusListener,为SQLAppStatesStore提供SQLUI展示数据
* */
def addToStatusQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, APP_STATUS_QUEUE)
}
/** Add a listener to the event log queue. */
/**将监听到的事件以Json方式写出到日志存储,需要配置,默认为关闭*/
def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EVENT_LOG_QUEUE)
}
/**
* Add a listener to a specific queue, creating a new queue if needed. Queues are independent
* of each other (each one uses a separate thread for delivering events), allowing slower
* listeners to be somewhat isolated from others.
* 前面几个方法内部均调用此方法
* 另外:spark structured streaming流式计算对应的StreamingQueryListenerBus通过addToQueue()方法加入"streams"队列
* (用于监听流的start、process、terminate时间,其中process事件能获取到流处理的详细进度,包括流名称、id、水印时间、
* source offsets、sink offsets等)
*/
private[spark] def addToQueue(
listener: SparkListenerInterface,
queue: String): Unit = synchronized {
if (stopped.get()) {
throw new IllegalStateException("LiveListenerBus is stopped.")
}
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
queues.add(newQueue)
}
}
事件投递
SparkListenerEvent事件类型
SparkListenerEvent
是一个特质,如下是一些子类,可以用于事件的展示、记录。
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
/* Whether output this event to the event log */
protected[spark] def logEvent: Boolean = true
}
@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent
外部公用事件投递接口POST
外部事件投递接口,SparkContext、DAGScheduler 、CoarseGrainedSchedulerBackend
等都通过post,提交事件到总线。
投递过程:
- 总线启动,调用postToQueues()方法将事件投入到对应的命名队列中。
- 总线未启动,将事件保存到ListBuffer[SparkListenerEvent]队列中,等待总线启动时投递事件,清空缓存
事件投递过程代码如下
// 在SparkContext中会调用事件的start方法启动总线
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {
// 标记总线为已启动
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("LiveListenerBus already started.")
}
this.sparkContext = sc
// 总线启动后,将queuedEvents缓存队列投递后清空
queues.asScala.foreach { q =>
q.start(sc)
queuedEvents.foreach(q.post)
}
queuedEvents = null
metricsSystem.registerSource(metrics)
}
//在post方法中,会判断总线是否启动及投递
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
metrics.numEventsPosted.inc()
// If the event buffer is null, it means the bus has been started and we can avoid
// synchronization and post events directly to the queues. This should be the most
// common case during the life of the bus.
// 总线已经启动,缓存队列queuedEvents已置为null,则直接投递
if (queuedEvents == null) {
postToQueues(event)
return
}
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
// calling start() picks up the new event.
synchronized {
if (!started.get()) {
// 总线未启动,则将事件先放入缓存队列
queuedEvents += event
return
}
}
// If the bus was already started when the check above was made, just post directly to the queues.
// 投递事件
postToQueues(event)
}
DAGScheduler投递事件分析
更新监控指标
def executorHeartbeatReceived(
execId: String,
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
Task执行启动及获取Result
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId =
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}
private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
}
private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
}
.........................................
Stage的启动停止
/** Called when stage's parents are available and we can now do its task.
* 在stages父类有空闲的时候,就可以去执行task
* */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute.
//1. 当前Stage没有计算完的分区对应的索引
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
//2. 关联ActiveJob中的调度池,作业组,描述等
val properties = jobIdToActiveJob(jobId).properties
//3. 将当前stage加入runningStages集合
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
//4. 根据Stage类别,计算分区位置
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
Job启动停止
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_], //
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //1.创建最终FinalStage(ResultStage)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
"than the total number of slots in the cluster currently.")
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
new BiFunction[Int, Int, Int] {
override def apply(key: Int, value: Int): Int = value + 1
})
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.....
...
private[scheduler] def cleanUpAfterSchedulerStop() {
for (job <- activeJobs) {
val error =
new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
// Tell the listeners that all of the running stages have ended. Don't bother
// cancelling the stages because if the DAG scheduler is stopped, the entire application
// is in the process of getting stopped.
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
// The `toArray` here is necessary so that we don't iterate over `runningStages` while
// mutating it.
runningStages.toArray.foreach { stage =>
markStageAsFinished(stage, Some(stageFailedMessage))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
AsyncEventQueue 异步事件处理
AsyncEventQueue类图继承结构
AsyncEventQueue方法列表
AsyncEventQueue 功能点
- dispatchThread AsyncEventQueue内部具有一个单一线程的dispatchThread,调用
dispatch()–>postToAll()–>doPostEvent()
方法持续处理eventQueue中事件,让所有注册的listener响应事件
AsyncEventQueue 父类doPostEvent方法实现
StreamingListenerBus及StreamingQueryListenerBus重写了doPostEvent(),只关注和处理流相关的事件。
从方法中看出,除了事件匹配还用了SparkListenerInterface
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
case executorBlacklisted: SparkListenerExecutorBlacklisted =>
listener.onExecutorBlacklisted(executorBlacklisted)
case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
listener.onExecutorUnblacklisted(executorUnblacklisted)
case nodeBlacklisted: SparkListenerNodeBlacklisted =>
listener.onNodeBlacklisted(nodeBlacklisted)
case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
}
AsyncEventQueue事件处理流程
SparkListenerInterface分析
Streaming 后续会详细分析
AppStatusListener
Spark UI中Job、Stage、Task页面,调用AppStatusStore提供的方法,读取kvstore中存储的rdd任务相关信息。
**
* A Spark listener that writes application information to a data store. The types written to the
* store are defined in the `storeTypes.scala` file and are based on the public REST API.
* Spark监听器,将应用程序信息写入数据存储。写入的类型
* store定义在' storeTypes中。scala '文件,并且基于公共REST API。
* @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of
* unfinished tasks can be more accurately calculated (see SPARK-21922).
*/
private[spark] class AppStatusListener(
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
SQLAppStatusListener
Spark UI中SQL页面,调用SQLAppStatusStore提供的方法,读取kvstore中存储的SparkPlan物理计划(SQL真实执行流程)相关信息。
class SQLAppStatusListener(
conf: SparkConf,
kvstore: ElementTrackingStore,
live: Boolean) extends SparkListener with Logging {
KVStore后续更新~~
晚安~~
边栏推荐
- 【图像传感器】相关双采样CDS
- 爬虫(17) - 面试(2) | 爬虫面试题库
- [image sensor] correlated double sampling CDs
- 最新高频Android面试题目分享,带你一起探究Android事件分发机制
- 【DesignMode】享元模式(Flyweight Pattern)
- [designmode] flyweight pattern
- LeetCode 1043. Separate the array to get the maximum and daily questions
- 应用在温度检测仪中的温度传感芯片
- LeetCode 403. 青蛙过河 每日一题
- 字节跳动Android面试,知识点总结+面试题解析
猜你喜欢
随机推荐
【DesignMode】模板方法模式(Template method pattern)
Localstorage and sessionstorage
数据中台落地实施之法
直接上干货,100%好评
【Seaborn】组合图表:PairPlot和JointPlot
[designmode] flyweight pattern
【DesignMode】代理模式(proxy pattern)
Temperature sensor chip used in temperature detector
LeetCode 1626. The best team without contradiction
作为Android开发程序员,android高级面试
【PHP】PHP接口继承及接口多继承原理与实现方法
最新阿里P7技术体系,妈妈再也不用担心我找工作了
深度监听 数组深度监听 watch
LeetCode 1626. 无矛盾的最佳球队 每日一题
Skimage learning (1)
字节跳动高工面试,轻松入门flutter
LeetCode 120. 三角形最小路径和 每日一题
水平垂直居中 方法 和兼容
SqlServer2014+: 创建表的同时创建索引
Seaborn数据可视化