当前位置:网站首页>Spark history server and event log details
Spark history server and event log details
2022-06-25 11:21:00 【pyiran】
Preface
Spark How to persist event
SHS Launch parameters
SHS Workflow
DiskCache
Read eventlog
Reference resources
Preface
This article will overall Let's introduce Spark History Server and event log How do we work together .Spark History Server( hereinafter referred to as SHS) It's a debug Yours Spark applications A very useful tool , I believe I know Spark All of us are familiar with it .SHS Is based on persistence application During the operation of events, Usually it will be defined event Write to a HDFS The file of , This file is called event log. Pay attention to this log And run time stdout/stderr Not a concept log.
Spark How to persist event
SHS Not responsible for event Of persistent , It's your application stay running It's done by the end of the day event Persistence . When you submit Spark job When , Must be started by configuration event log And specify your log path yes SHS Read the event log Of path, So your job Will be SHS find .
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs
that Spark What will be recorded event Well ? So let's see SparkListenerEvent, This is all. event Base class of ( Of course Spark streaming Of event No direct inheritance SparkListenerEvent). All inherited this class , And didn't put logEvent Set to false Of event, Will be EventLoggingListener Of logEvent Function to Json In the form of event log in , One act, one act event.
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
/* Whether output this event to the event log */
protected[spark] def logEvent: Boolean = true
}
for instance , stay EventLoggingListener in , When I received SparkListenerStageSubmitted When an event is , Will execute onStageSubmitted function , This function calls logEvent, take SparkListenerStageSubmitted To Json And written to the event log in .
/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = JsonProtocol.sparkEventToJson(event)
// scalastyle:off println
writer.foreach(_.println(compact(render(eventJson))))
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.hflush())
}
if (testing) {
loggedEvents += eventJson
}
}
// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)
SparkContext When initializing, it will call EventLoggingListener Of start() function , This function will be the corresponding application Generate a event log file , The file name is usually {appId}_{attempId}, The file name starts with a suffix ".inprogress", When application End of run , Will call its stop() function , take event log Of ".inprogress" Get rid of .SHS Then judge the corresponding by this application yes incomplete still complete.
SHS Launch parameters
SHS Startup time , There is an important parameter "spark.history.fs.logDirectory", This is the designation SHS Read event log Where the path . There is another parameter that needs special attention "spark.history.store.path", This parameter is SHS Of disk cache Location , If not set , that SHS Every request is received from memory Read data in , If set , Then the data cache To this path , And use levelDB For storage . This parameter suggests that you enable get up , Otherwise, for the slightly larger ones application, Each time from SHS Loading there will be very slow ( I encountered a situation that it took me more than half an hour or even longer to load , So the following introductions are based on the following disk cache The situation , As mentioned below levelDB Can be thought of as this disk cache). There are other things about event log The deletion of ,cache size,application Quantity, etc , For details, please refer to Apache Spark Official website Monitoring.
SHS Workflow

The above figure is a very simple process , Here we distinguish SHS and Application web UI, These two simplest ways of understanding ,SHS There will be more application Are effective , And even if application It can also exist after it is over , and application web UI There is only running period , And from SparkContext Start a UI.
DiskCache
First of all, let's make one thing clear ,SHS Will maintain a listing, This listing It's a Spark The definition of KVStore, What is stored in it is all that should appear in SHS Of application info( It's a high level Of applicaiton info), This listing Will correspond to LevelDB One of the "listing.ldb" in . We are SHS UI On the home page application list It's from "listing.ldb" Reads the . meanwhile , Every application Of LevelDB The document is placed in "apps" in , Each... Under this folder application Corresponding to one {appId}_{attemptId}.ldb. When specific to a specific application When , This is where data is read , It will contain stage,executor,task And so on .SHS For these two KVStore The basic idea of data generation is consistent , Therefore, the following text will focus on listing To explain in detail .
Read eventlog
As we mentioned above event Is in accordance with the Json The way of recording to event log in , Here we will introduce SHS How to read a file and write it to LevelDB Medium .
stay FsHistoryProvider There's a function in mergeApplicationListing, I only pasted the relevant parts here code. We can see ,EventLoggingListener.openEventLog(logPath, fs) Here we implement reading event log And pass it into SparkListenerBus Of replay Function .ReplayListenerBus, He is inherited from SparkListenerBus, So it can be based on event Type execution ReplayListenerBus Registered in listener The corresponding function of . To update listing, Registered a AppListingListener. We can see , This listener Only a few specific functions are implemented , And just pick out what you need event(val eventsFilter: ReplayEventsFilter) Conduct post. That's because of this listener Just parse application Level information , After treatment ,listing It's updated .
/** * Replay the given log file, saving the application in the listing db. */
protected def mergeApplicationListing(
fileStatus: FileStatus,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
val eventsFilter: ReplayEventsFilter = {
eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
eventString.startsWith(APPL_END_EVENT_PREFIX) ||
eventString.startsWith(LOG_START_EVENT_PREFIX) ||
eventString.startsWith(ENV_UPDATE_EVENT_PREFIX) ||
eventString.startsWith(APPL_FINAL_STATUS_UPDATE_EVENT_PREFIX) ||
eventString.startsWith(Type_UPDATE_EVENT_PREFIX)
}
val logPath = fileStatus.getPath()
val appCompleted = isCompleted(logPath.getName())
val reparseChunkSize = conf.get(END_EVENT_REPARSE_CHUNK_SIZE)
// Enable halt support in listener if:
// - app in progress && fast parsing enabled
// - skipping to end event is enabled (regardless of in-progress state)
val shouldHalt = enableOptimizations &&
((!appCompleted && fastInProgressParsing) || reparseChunkSize > 0)
val bus = new ReplayListenerBus()
val listener = new AppListingListener(fileStatus, clock, shouldHalt)
bus.addListener(listener)
logInfo(s"Parsing $logPath for listing data...")
Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) {
in =>
bus.replay(in, logPath.toString, !appCompleted, eventsFilter)
}
... ...

In a brief summary ,SHS Will be based on event log For each line in Json Deserialization generates the corresponding event, these event Will be post To all directions ReplayListenerBus Registered listener Execute the corresponding listing Update , When listing After the update , By calling KVStore.write Function to write LevelDB.
Reference resources :
Apache Spark: https://spark.apache.org/docs/latest/monitoring.html
边栏推荐
- Handler、Message、Looper、MessageQueue
- Socket communication principle
- 如何实现移动端富文本编辑器功能
- 龙书虎书鲸书啃不动?试试豆瓣评分9.5的猴书
- 金仓数据库 KingbaseES 插件identity_pwdexp
- 金仓数据库 KingbaseES 插件ftutilx
- Application of global route guard
- Handler、Message、Looper、MessageQueue
- Redis6 note02 configuration file, publish and subscribe, new data type, jedis operation
- scrapy+scrapyd+gerapy 爬虫调度框架
猜你喜欢

数据库系列:MySQL索引优化总结(综合版)

开源社邀请您参加OpenSSF开源安全线上研讨会

Double tampon transparent cryptage et décryptage basé sur le cadre minifilter

Task03 probability theory
![[file inclusion vulnerability-04] classic interview question: how to getshell when a website is known to have only local file inclusion vulnerability?](/img/28/ab02d38bde47053b155e0545b47039.png)
[file inclusion vulnerability-04] classic interview question: how to getshell when a website is known to have only local file inclusion vulnerability?

CFCA安心签接入

Coscon'22 lecturer solicitation order

Software testing to avoid being dismissed during the probation period

zabbix分布式系统监控

How to start the phpstudy server
随机推荐
Jincang database kingbasees plug-in identity_ pwdexp
Redis6笔记02 配置文件,发布和订阅,新数据类型,Jedis操作
金仓数据库 KingbaseES 插件ftutilx
金仓数据库 KingbaseES 插件DBMS_RANDOM
杭州/北京内推 | 阿里达摩院招聘视觉生成方向学术实习生(人才计划)
Leetcode 1249. 移除无效的括号(牛逼,终于做出来了)
SQL注入漏洞(类型篇)
基于SSH的高校实验室物品管理信息系统的设计与实现 论文文档+项目源码及数据库文件
Some assembly instructions specific to arm64
Bayes
查询法,中断法实现USART通信
CFCA安心签接入
Introduction to JVM principle
GaussDB 如何统计用户sql的响应时间
Upload and modify the use of avatars
Arrays.asList()
16 种企业架构策略
What are the functions of arm64 assembly that need attention?
MySQL synchronous data configuration and shell script implementation
ARM64汇编的函数有那些需要注意?