当前位置:网站首页>Problems and recovery of spark streaming checkpoint

Problems and recovery of spark streaming checkpoint

2022-06-22 16:57:00 ZH519080

/** although checkpoint It's right Spark Streaming Metadata in the running process and each time RDD Data status of 
  *  Save to a persistent system , Achieve high availability .
  *  Even if 
  *    /** When the program is modified and packaged into a new program , There may be a mistake , If delete checkpoint The opening file of , Keep only data files :
  * hadoop dfs -rmr /checkpoint/checkpoint*
  *  But although the new program can be restarted , But the last data file will not be read , Instead, start counting again ,
  *  Data will still be lost 
  * */
  *  but checkpoint The disadvantages of :
  *    If streaming program code or configuration changes , Stop the old one first spark Streaming Program , Then package and compile the new program and execute it again ,
  *    There are two situations :
  *   1、 Initiate error reporting , Deserialization exception 
  *   2、 Start up normal , But the code that may be running is the old program code of the last time 
  *    Why is this the case ???
  *    This is because checkpoint During the first persistence, the entire related program will be jar Package is serialized into a binary file ,
  *    Every restart will start from checkpoint Restore... In directory , Even if the new program is packaged and serialized, the old serialized binary file is still loaded ,
  *    Will cause an error or still execute the old code program .
  *    If the last checkpoint Delete , When starting a new program , Only from kafka Of smallest or largest( The default is the latest ) Offset consumption ,
  *    If configured as smallest Will cause data duplication , If configured as largest Will result in data loss .
  *    For the above problems , There are two solutions :
  *   1、 Old programs do not close , The new program starts , Two programs coexist for a period of time to execute consumption 
  *   2、 Record the offset of the old program when it is closed , When the new program is started, it can be consumed directly from the offset .
  *
  *
  *    But if you don't use checkpoint function , It's like upstatebykey How to use stateful functions ?????*/


/** Start the pre write log mechanism 
  *  Pre write log mechanism (Write Ahead Log,WAL), If this mechanism is activated ,Receiver All data received will 
  *  Is written to the configured checkpoint Directory ,driver Avoid data loss when restoring data .
  *  call StreamingContext.checkpoint Configure a checkpoint Directory , then 
  * spark.streaming.receiver.writeAheadLog.enable Set to true*/


//** stay Spark Streaming After the application hangs , If recompile Spark Streaming The application runs again , It's not from 
//*  The location of the hang up is restored , Because recompilation will result in non deserialization Checkpoint

/** from Driver Conditions for restarting and resuming application in case of failure :
  * 1、 If the application starts for the first time , A new StreamingContext example , Set a directory to save Checkpoint data 
  * 2、 If from Driver Restart and recover from failure , We must start from Checkpoint Import in directory Checkpoint Data 
  *  Recreate StreamingContext example .
  *
  *  The above two points can be passed StreamingContext.getOrCreate Method realization */
  def exeStreamSV: Unit ={
    /** This method cannot be recovered from any location */
    val ssc = StreamingContext.getOrCreate(checkpointDir,streamingSV _)
    ssc.start()
    ssc.awaitTermination()

    ssc.stop()
  }

  val checkpointDir = "hdfs://zhumaster:8020/data/checkpoint"
  def stateValues(values: Seq[Int], state: Option[Int]): Option[Int] ={
    Some(values.size + state.getOrElse(0))
  }

  def streamingSV: StreamingContext ={


    val sparkConf = new SparkConf().setAppName("checkpoint recover")
      .setMaster("local[*]")

    val ssc = new StreamingContext(sparkConf,Duration(500))

    ssc.checkpoint(checkpointDir)

    ssc.socketTextStream("zhumaster",9999).flatMap(_.split(" ")).map((_,1))
      .updateStateByKey(stateValues _)
      .checkpoint(Duration(200))
      .print()

    ssc
  }

 

原网站

版权声明
本文为[ZH519080]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/173/202206221523254305.html