当前位置:网站首页>[source code interpretation] | source code interpretation of livelistenerbus
[source code interpretation] | source code interpretation of livelistenerbus
2022-07-07 17:07:00 【857 technology community】
Reading guide
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
* Asynchronous will SparkListenerEvents Pass to registered SparkListeners.
*
* Until `start()` is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
* Calling “ start()” Before , All published events are only buffered .
* Only after this listener bus starts , The event will actually propagate to all connected listeners .
* When calling `stop()` when , This listener bus stops , And it will discard other events after it stops .
*/
Why use event listening ? Imagine if Spark The event notification adopts Scala Function call mode , With the increase of cluster size , Will call more and more functions , Will eventually be JVM The limit of the number of threads affects the update of monitoring data , Even the monitoring data cannot be provided to users . Function calls are mostly synchronous calls , This will also cause thread blocking , And be occupied for a long time . What are the benefits of using the event listening mechanism ? The function call will be replaced by event sending or event delivery , The processing of events is asynchronous , The current thread can continue to execute subsequent logic , Threads in the thread pool can also be reused , The concurrency of the whole system will be greatly increased . The sent event will enter the cache , From the scheduled scheduling , The listener assigned to listen to this event updates the monitoring data .
queue
Asynchronous event queue
Asynchronous event queuing is mainly composed of LinkedBlockingQueue[SparkListenerEvent]
structure , The default size is 10000
The event listening thread will continue from LinkedBlockingQueue
Get events in . Any event will happen in LinkedBlockingQueue
Store for a period of time , When the thread finishes processing this event , Will clear it .
// LiveListenerBus.scala
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
// AsyncEventQueue.scala
// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) // The default value is 10000
Listener queue
/** Add a listener to queue shared by all non-internal listeners. */
/**
* Mainly by SparkContext call , That is, users can add Listener,
* Or add Listener And reflect the call [ Realize in SparkContext Medium setupAndStartListenerBus()]
* */
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, SHARED_QUEUE)
}
/** Add a listener to the executor management queue. */
/**
* Can increase HeartbeatReceiver( For monitoring Executor Of Add and Remove, And use threads to regularly judge each Executor The heartbeat time of , Timeout Kill
* Executor), In addition, through ExecutorAllocationManager increase ExecutorAllocationListener
* ( By calculating the total task Sum of numbers Excutor Parallelism matching , Dynamic increase 、 Reduce Executor, Need configuration , Off by default )
* */
def addToManagementQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
}
/** Add a listener to the application status queue. */
/**
* Mainly increased AppStatusListener, by AppStatusStore Provide Job、Stage、Task Of UI Display data ,
* And added SQLAppStatusListener, by SQLAppStatesStore Provide SQLUI Display data
* */
def addToStatusQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, APP_STATUS_QUEUE)
}
/** Add a listener to the event log queue. */
/** Set the monitored events as Json Write out to log storage , Need configuration , The default is off */
def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, EVENT_LOG_QUEUE)
}
/**
* Add a listener to a specific queue, creating a new queue if needed. Queues are independent
* of each other (each one uses a separate thread for delivering events), allowing slower
* listeners to be somewhat isolated from others.
* This method is called internally in the previous methods
* in addition :spark structured streaming Flow calculation corresponds to StreamingQueryListenerBus adopt addToQueue() Methods add "streams" queue
* ( Used to listen for streams start、process、terminate Time , among process Event can get the detailed progress of stream processing , Include stream name 、id、 Watermark time 、
* source offsets、sink offsets etc. )
*/
private[spark] def addToQueue(
listener: SparkListenerInterface,
queue: String): Unit = synchronized {
if (stopped.get()) {
throw new IllegalStateException("LiveListenerBus is stopped.")
}
queues.asScala.find(_.name == queue) match {
case Some(queue) =>
queue.addListener(listener)
case None =>
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
queues.add(newQueue)
}
}
Event delivery
SparkListenerEvent Event type
SparkListenerEvent
It's a trait , Here are some subclasses , It can be used for the display of events 、 Record .
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
/* Whether output this event to the event log */
protected[spark] def logEvent: Boolean = true
}
@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent
External public event delivery interface POST
External event delivery interface ,SparkContext、DAGScheduler 、CoarseGrainedSchedulerBackend
And so on post, Submit events to the bus .
Delivery process :
- Bus start , call postToQueues() Method to put the event into the corresponding named queue .
- Bus not started , Save the event to ListBuffer[SparkListenerEvent] In line , Wait for the event to be delivered when the bus starts , Empty cache
The event delivery process code is as follows
// stay SparkContext Events will be called in start Method to start the bus
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized {
// Mark the bus as started
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("LiveListenerBus already started.")
}
this.sparkContext = sc
// After the bus starts , take queuedEvents The cache queue is emptied after delivery
queues.asScala.foreach { q =>
q.start(sc)
queuedEvents.foreach(q.post)
}
queuedEvents = null
metricsSystem.registerSource(metrics)
}
// stay post In the method , Can judge whether the bus is started and delivered
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
metrics.numEventsPosted.inc()
// If the event buffer is null, it means the bus has been started and we can avoid
// synchronization and post events directly to the queues. This should be the most
// common case during the life of the bus.
// The bus has started , Cache queue queuedEvents Has been set to null, Direct delivery
if (queuedEvents == null) {
postToQueues(event)
return
}
// Otherwise, need to synchronize to check whether the bus is started, to make sure the thread
// calling start() picks up the new event.
synchronized {
if (!started.get()) {
// Bus not started , Then put the event into the cache queue first
queuedEvents += event
return
}
}
// If the bus was already started when the check above was made, just post directly to the queues.
// Delivery events
postToQueues(event)
}
DAGScheduler Delivery event analysis
Update monitoring indicators
def executorHeartbeatReceived(
execId: String,
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
Task Execute startup and acquisition Result
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId =
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}
private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = {
listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId))
}
private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
}
.........................................
Stage Start stop of
/** Called when stage's parents are available and we can now do its task.
* stay stages When the parent class is free , You can do it task
* */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute.
//1. At present Stage The index corresponding to the partition that has not been calculated
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
//2. relation ActiveJob Scheduling pool in , Functional group , Description, etc.
val properties = jobIdToActiveJob(jobId).properties
//3. Will the current stage Join in runningStages aggregate
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
//4. according to Stage Category , Calculate partition location
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
Job Start stop
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_], //
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) //1. Create the final FinalStage(ResultStage)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
"than the total number of slots in the cluster currently.")
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
new BiFunction[Int, Int, Int] {
override def apply(key: Int, value: Int): Int = value + 1
})
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.....
...
private[scheduler] def cleanUpAfterSchedulerStop() {
for (job <- activeJobs) {
val error =
new SparkException(s"Job ${job.jobId} cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
// Tell the listeners that all of the running stages have ended. Don't bother
// cancelling the stages because if the DAG scheduler is stopped, the entire application
// is in the process of getting stopped.
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
// The `toArray` here is necessary so that we don't iterate over `runningStages` while
// mutating it.
runningStages.toArray.foreach { stage =>
markStageAsFinished(stage, Some(stageFailedMessage))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
}
}
AsyncEventQueue Asynchronous event handling
AsyncEventQueue Class diagram inheritance structure
AsyncEventQueue Method list
AsyncEventQueue The function point
- dispatchThread AsyncEventQueue There is a single thread inside dispatchThread, call
dispatch()–>postToAll()–>doPostEvent()
Method continuous processing eventQueue Middle event , Let all registered listener Responding to events
AsyncEventQueue Parent class doPostEvent Method realization
StreamingListenerBus And StreamingQueryListenerBus Rewrote doPostEvent(), Only focus on events related to processing flow .
It can be seen from the method that , In addition to event matching, it also uses SparkListenerInterface
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
case executorBlacklisted: SparkListenerExecutorBlacklisted =>
listener.onExecutorBlacklisted(executorBlacklisted)
case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
listener.onExecutorUnblacklisted(executorUnblacklisted)
case nodeBlacklisted: SparkListenerNodeBlacklisted =>
listener.onNodeBlacklisted(nodeBlacklisted)
case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
}
AsyncEventQueue Event handling process
SparkListenerInterface analysis
Streaming It will be analyzed in detail later
AppStatusListener
Spark UI in Job、Stage、Task page , call AppStatusStore Methods provided , Read kvstore Stored in the rdd Task related information .
**
* A Spark listener that writes application information to a data store. The types written to the
* store are defined in the `storeTypes.scala` file and are based on the public REST API.
* Spark Monitor , Write application information to the data store . Type written
* store It's defined in ' storeTypes in .scala ' file , And based on public REST API.
* @param lastUpdateTime When replaying logs, the log's last update time, so that the duration of
* unfinished tasks can be more accurately calculated (see SPARK-21922).
*/
private[spark] class AppStatusListener(
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
SQLAppStatusListener
Spark UI in SQL page , call SQLAppStatusStore Methods provided , Read kvstore Stored in the SparkPlan Physical plan (SQL Real execution process ) Related information .
class SQLAppStatusListener(
conf: SparkConf,
kvstore: ElementTrackingStore,
live: Boolean) extends SparkListener with Logging {
KVStore Subsequent updates ~~
Good night, ~~
边栏推荐
猜你喜欢
测试用例管理工具推荐
QT 图片背景色像素处理法
值得一看,面试考点与面试技巧
低代码(lowcode)帮助运输公司增强供应链管理的4种方式
【Seaborn】组合图表:FacetGrid、JointGrid、PairGrid
A tour of gRPC:03 - proto序列化/反序列化
最新2022年Android大厂面试经验,安卓View+Handler+Binder
SlashData开发者工具榜首等你而定!!!
The latest interview experience of Android manufacturers in 2022, Android view+handler+binder
Sator a lancé le jeu web 3 "satorspace" et a lancé huobi
随机推荐
AI来搞财富分配比人更公平?来自DeepMind的多人博弈游戏研究
dapp丨defi丨nft丨lp单双币流动性挖矿系统开发详细说明及源码
Proxmox VE重装后,如何无损挂载原有的数据盘?
LeetCode 1654. The minimum number of jumps to get home one question per day
最新Android面试合集,android视频提取音频
LeetCode 1031. 两个非重叠子数组的最大和 每日一题
【饭谈】那些看似为公司着想,实际却很自私的故事 (一:造轮子)
skimage学习(1)
Seaborn数据可视化
LeetCode 1186. Delete once to get the sub array maximum and daily question
LeetCode 1049. Weight of the last stone II daily question
QT picture background color pixel processing method
Flask搭建api服务-生成API文档
[designmode] template method pattern
[PHP] PHP interface inheritance and interface multi inheritance principle and implementation method
Sqlserver2014+: create indexes while creating tables
Sator推出Web3游戏“Satorspace” ,并上线Huobi
Test case management tool recommendation
一文读懂数仓中的pg_stat
【Seaborn】组合图表:FacetGrid、JointGrid、PairGrid