当前位置:网站首页>Problems and recovery of spark streaming checkpoint
Problems and recovery of spark streaming checkpoint
2022-06-22 16:57:00 【ZH519080】
/** although checkpoint It's right Spark Streaming Metadata in the running process and each time RDD Data status of * Save to a persistent system , Achieve high availability . * Even if * /** When the program is modified and packaged into a new program , There may be a mistake , If delete checkpoint The opening file of , Keep only data files : * hadoop dfs -rmr /checkpoint/checkpoint* * But although the new program can be restarted , But the last data file will not be read , Instead, start counting again , * Data will still be lost * */ * but checkpoint The disadvantages of : * If streaming program code or configuration changes , Stop the old one first spark Streaming Program , Then package and compile the new program and execute it again , * There are two situations : * 1、 Initiate error reporting , Deserialization exception * 2、 Start up normal , But the code that may be running is the old program code of the last time * Why is this the case ??? * This is because checkpoint During the first persistence, the entire related program will be jar Package is serialized into a binary file , * Every restart will start from checkpoint Restore... In directory , Even if the new program is packaged and serialized, the old serialized binary file is still loaded , * Will cause an error or still execute the old code program . * If the last checkpoint Delete , When starting a new program , Only from kafka Of smallest or largest( The default is the latest ) Offset consumption , * If configured as smallest Will cause data duplication , If configured as largest Will result in data loss . * For the above problems , There are two solutions : * 1、 Old programs do not close , The new program starts , Two programs coexist for a period of time to execute consumption * 2、 Record the offset of the old program when it is closed , When the new program is started, it can be consumed directly from the offset . * * * But if you don't use checkpoint function , It's like upstatebykey How to use stateful functions ?????*/ /** Start the pre write log mechanism * Pre write log mechanism (Write Ahead Log,WAL), If this mechanism is activated ,Receiver All data received will * Is written to the configured checkpoint Directory ,driver Avoid data loss when restoring data . * call StreamingContext.checkpoint Configure a checkpoint Directory , then * spark.streaming.receiver.writeAheadLog.enable Set to true*/ //** stay Spark Streaming After the application hangs , If recompile Spark Streaming The application runs again , It's not from //* The location of the hang up is restored , Because recompilation will result in non deserialization Checkpoint /** from Driver Conditions for restarting and resuming application in case of failure : * 1、 If the application starts for the first time , A new StreamingContext example , Set a directory to save Checkpoint data * 2、 If from Driver Restart and recover from failure , We must start from Checkpoint Import in directory Checkpoint Data * Recreate StreamingContext example . * * The above two points can be passed StreamingContext.getOrCreate Method realization */
def exeStreamSV: Unit ={
/** This method cannot be recovered from any location */
val ssc = StreamingContext.getOrCreate(checkpointDir,streamingSV _)
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
val checkpointDir = "hdfs://zhumaster:8020/data/checkpoint"
def stateValues(values: Seq[Int], state: Option[Int]): Option[Int] ={
Some(values.size + state.getOrElse(0))
}
def streamingSV: StreamingContext ={
val sparkConf = new SparkConf().setAppName("checkpoint recover")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf,Duration(500))
ssc.checkpoint(checkpointDir)
ssc.socketTextStream("zhumaster",9999).flatMap(_.split(" ")).map((_,1))
.updateStateByKey(stateValues _)
.checkpoint(Duration(200))
.print()
ssc
}
边栏推荐
- 【微信小程序自定义底部tabbar】
- Uniapp wechat applet obtains page QR code (with parameters)
- 同花顺容易开户么?网上开户安全么?
- Test for API
- 迭代器与生成器
- 毕业季·本科毕业感想——机械er的自救之路
- Windows8.1 64 installed by mysql5.7.27
- In the era of video explosion, who is supporting the high-speed operation of video ecological network?
- Summary of spark common operators
- 面试题之JS判断数据类型的方法
猜你喜欢

How to add a "security lock" to the mobile office of government and enterprises?

VHEDT业务发展框架

Summary of safari compatibility issues

每秒處理10萬高並發訂單的樂視集團支付系統架構分享
![[wechat applet to obtain the height of custom tabbar] is absolutely available!!!](/img/ed/7ff70178f03b50cb7bec349c1be5e0.png)
[wechat applet to obtain the height of custom tabbar] is absolutely available!!!

IDEA安装总结

系统吞吐量、TPS(QPS)、用户并发量、性能测试概念和公式

jMeter使用案例

In case of default import failure

新手必会的静态站点生成器——Gridsome
随机推荐
MYSQL_ERRNO : 1205 MESSAGE :Lock wait timeout exceeded; try restarting transacti
【C语言】库函数qsort的使用
【阿里云服务器-安装mysql的5.6版本安装,重装】
面对默认导入失败的情况
为数字添加千分位符号(金额千分位)
面试知识点
【C语言深度解剖】关键字if&&else&&bool类型
spark Executor执行结果的处理源码
招行23型号UKey在win7上无法识别
scala-for的基本应用
Analysis of the read data source code of spark shuffle
Short video source code development, high-quality short video source code need to do what?
[pop up box at the bottom of wechat applet package] I
Implementing factory mode using enumeration
洞见科技牵头的全球「首个」IEEE隐私计算「互联互通」国际标准正式启动
Make the code elegant (learn debugging + code style)
Machine learning notes - Hagrid - Introduction to gesture recognition image data set
The world's "first" IEEE privacy computing "connectivity" international standard led by insight technology was officially launched
uniapp微信小程序获取页面二维码(带有参数)
每秒處理10萬高並發訂單的樂視集團支付系統架構分享