当前位置:网站首页>Spark history server and event log details

Spark history server and event log details

2022-06-25 11:21:00 pyiran

Preface
Spark How to persist event
SHS Launch parameters
SHS Workflow
DiskCache
Read eventlog
Reference resources

Preface

This article will overall Let's introduce Spark History Server and event log How do we work together .Spark History Server( hereinafter referred to as SHS) It's a debug Yours Spark applications A very useful tool , I believe I know Spark All of us are familiar with it .SHS Is based on persistence application During the operation of events, Usually it will be defined event Write to a HDFS The file of , This file is called event log. Pay attention to this log And run time stdout/stderr Not a concept log.

Spark How to persist event

SHS Not responsible for event Of persistent , It's your application stay running It's done by the end of the day event Persistence . When you submit Spark job When , Must be started by configuration event log And specify your log path yes SHS Read the event log Of path, So your job Will be SHS find .

spark.eventLog.enabled true 
spark.eventLog.dir hdfs://namenode/shared/spark-logs

that Spark What will be recorded event Well ? So let's see SparkListenerEvent, This is all. event Base class of ( Of course Spark streaming Of event No direct inheritance SparkListenerEvent). All inherited this class , And didn't put logEvent Set to false Of event, Will be EventLoggingListener Of logEvent Function to Json In the form of event log in , One act, one act event.

@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
    
  /* Whether output this event to the event log */
  protected[spark] def logEvent: Boolean = true
}

for instance , stay EventLoggingListener in , When I received SparkListenerStageSubmitted When an event is , Will execute onStageSubmitted function , This function calls logEvent, take SparkListenerStageSubmitted To Json And written to the event log in .

/** Log the event as JSON. */
  private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
    
    val eventJson = JsonProtocol.sparkEventToJson(event)
    // scalastyle:off println
    writer.foreach(_.println(compact(render(eventJson))))
    // scalastyle:on println
    if (flushLogger) {
    
      writer.foreach(_.flush())
      hadoopDataStream.foreach(_.hflush())
    }
    if (testing) {
    
      loggedEvents += eventJson
    }
  }

  // Events that do not trigger a flush
  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)

SparkContext When initializing, it will call EventLoggingListener Of start() function , This function will be the corresponding application Generate a event log file , The file name is usually {appId}_{attempId}, The file name starts with a suffix ".inprogress", When application End of run , Will call its stop() function , take event log Of ".inprogress" Get rid of .SHS Then judge the corresponding by this application yes incomplete still complete.

SHS Launch parameters

SHS Startup time , There is an important parameter "spark.history.fs.logDirectory", This is the designation SHS Read event log Where the path . There is another parameter that needs special attention "spark.history.store.path", This parameter is SHS Of disk cache Location , If not set , that SHS Every request is received from memory Read data in , If set , Then the data cache To this path , And use levelDB For storage . This parameter suggests that you enable get up , Otherwise, for the slightly larger ones application, Each time from SHS Loading there will be very slow ( I encountered a situation that it took me more than half an hour or even longer to load , So the following introductions are based on the following disk cache The situation , As mentioned below levelDB Can be thought of as this disk cache). There are other things about event log The deletion of ,cache size,application Quantity, etc , For details, please refer to Apache Spark Official website Monitoring.

SHS Workflow

SHS Work flow chart

The above figure is a very simple process , Here we distinguish SHS and Application web UI, These two simplest ways of understanding ,SHS There will be more application Are effective , And even if application It can also exist after it is over , and application web UI There is only running period , And from SparkContext Start a UI.

DiskCache

First of all, let's make one thing clear ,SHS Will maintain a listing, This listing It's a Spark The definition of KVStore, What is stored in it is all that should appear in SHS Of application info( It's a high level Of applicaiton info), This listing Will correspond to LevelDB One of the "listing.ldb" in . We are SHS UI On the home page application list It's from "listing.ldb" Reads the . meanwhile , Every application Of LevelDB The document is placed in "apps" in , Each... Under this folder application Corresponding to one {appId}_{attemptId}.ldb. When specific to a specific application When , This is where data is read , It will contain stage,executor,task And so on .SHS For these two KVStore The basic idea of data generation is consistent , Therefore, the following text will focus on listing To explain in detail .

Read eventlog

As we mentioned above event Is in accordance with the Json The way of recording to event log in , Here we will introduce SHS How to read a file and write it to LevelDB Medium .
stay FsHistoryProvider There's a function in mergeApplicationListing, I only pasted the relevant parts here code. We can see ,EventLoggingListener.openEventLog(logPath, fs) Here we implement reading event log And pass it into SparkListenerBus Of replay Function .ReplayListenerBus, He is inherited from SparkListenerBus, So it can be based on event Type execution ReplayListenerBus Registered in listener The corresponding function of . To update listing, Registered a AppListingListener. We can see , This listener Only a few specific functions are implemented , And just pick out what you need event(val eventsFilter: ReplayEventsFilter) Conduct post. That's because of this listener Just parse application Level information , After treatment ,listing It's updated .

/** * Replay the given log file, saving the application in the listing db. */
  protected def mergeApplicationListing(
      fileStatus: FileStatus,
      scanTime: Long,
      enableOptimizations: Boolean): Unit = {
    
    val eventsFilter: ReplayEventsFilter = {
     eventString =>
      eventString.startsWith(APPL_START_EVENT_PREFIX) ||
        eventString.startsWith(APPL_END_EVENT_PREFIX) ||
        eventString.startsWith(LOG_START_EVENT_PREFIX) ||
        eventString.startsWith(ENV_UPDATE_EVENT_PREFIX) ||
        eventString.startsWith(APPL_FINAL_STATUS_UPDATE_EVENT_PREFIX) ||
        eventString.startsWith(Type_UPDATE_EVENT_PREFIX)
    }

    val logPath = fileStatus.getPath()
    val appCompleted = isCompleted(logPath.getName())
    val reparseChunkSize = conf.get(END_EVENT_REPARSE_CHUNK_SIZE)

    // Enable halt support in listener if:
    // - app in progress && fast parsing enabled
    // - skipping to end event is enabled (regardless of in-progress state)
    val shouldHalt = enableOptimizations &&
      ((!appCompleted && fastInProgressParsing) || reparseChunkSize > 0)

    val bus = new ReplayListenerBus()
    val listener = new AppListingListener(fileStatus, clock, shouldHalt)
    bus.addListener(listener)

    logInfo(s"Parsing $logPath for listing data...")
    Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) {
     in =>
      bus.replay(in, logPath.toString, !appCompleted, eventsFilter)
    }
    ... ...

AppListingListener

In a brief summary ,SHS Will be based on event log For each line in Json Deserialization generates the corresponding event, these event Will be post To all directions ReplayListenerBus Registered listener Execute the corresponding listing Update , When listing After the update , By calling KVStore.write Function to write LevelDB.

Reference resources :

Apache Spark: https://spark.apache.org/docs/latest/monitoring.html

原网站

版权声明
本文为[pyiran]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202200540010172.html