当前位置:网站首页>Mapwithstate of spark streaming state flow
Mapwithstate of spark streaming state flow
2022-07-26 16:06:00 【InfoQ】
background
updateStateByKey mapWithStatemapWithStateWorkflow
mapWithStateupdateStateBykey
mapWithStatemapWithStateSpecific classes and relationships
MapWithStateStreamImpl,InternalMapWithStateStream,MapWithStateRDD,MapWithStateRDDRecordmapWithState
Source code analysis
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("d:\\tmp")
val params = Map("bootstrap.servers" -> "master:9092", "group.id" -> "scala-stream-group")
val topic = Set("test")
val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topic)
val word = messages.flatMap(_._2.split(" ")).map { x => (x, 1) }
// Customize mappingFunction, Add up the number of times the word appears and update the status
val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => {
val sum = count.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
// call mapWithState Manage the state of the stream data
val stateDstream = word.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)).print()
ssc.start()
ssc.awaitTermination()
mapWithStatePairDStreamFunctionstoPairDStreamFunctionsmapWithState implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
PairDStreamFunctions[K, V] = {
new PairDStreamFunctions[K, V](stream)
}
PairDStreamFunctions.mapWithState()@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self, # This is this batch DStream object
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}
mapWithStateStateSpecStateSpecmapWithStatemapWithStateprivate val internalStream =
new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)
......
# This method just converts the data that needs to be returned , The main logic is internalStream Of computer Inside
override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}
MapWithStateDStreamImplinternalStream.getOrComputeDStream.getOrComputeInternalMapWithStateDStream.computeInternalMapWithStateDStream: line 132
/** Method that generates a RDD for the given time */
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
// Calculate the last state RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match {
case Some(rdd) =>
// This rdd yes RDD[MapWithStateRDDRecord[K, S, E]] type , It's really just a MapWithStateRDD
if (rdd.partitioner != Some(partitioner)) {
// If the RDD is not partitioned the right way, let us repartition it using the
// partition index as the key. This is to ensure that state RDD is always partitioned
// before creating another state RDD using it
// _.stateMap.getAll() Is to get all the status data ( Look down at the meeting and you will understand ), On this basis, create MapWithStateRDD
MapWithStateRDD.createFromRDD[K, V, S, E](
rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
} else {
rdd
}
case None =>
// The first batch of data will enter this branch
MapWithStateRDD.createFromPairRDD[K, V, S, E](
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
partitioner,
validTime
)
}
// This batch of stream data consists of RDD:dataRDD
val dataRDD = parent.getOrCompute(validTime).getOrElse {
context.sparkContext.emptyRDD[(K, V)]
}
val partitionedDataRDD = dataRDD.partitionBy(partitioner) // Repartition
val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
(validTime - interval).milliseconds
}
// structure MapWithStateRDD
Some(new MapWithStateRDD(
prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
}
}
- If you can get results MapWithStateRDD
- Judge that RDD Whether it has been partitioned correctly , The later calculation is to calculate the old state and the new incoming data corresponding to each partition , If the partitions of the two are inconsistent , Will cause calculation errors ( It will be updated to the wrong state ). If the partition is correct , Then go straight back RDD, Otherwise, by
MapWithStateRDD.createFromRDDAppoint partitioner Repartition of data
- If the calculation cannot get the result MapWithStateRDD
- adopt
MapWithStateRDD.createFromPairRDDCreate a , If in WordCount There are pairs in the code StateSpec Set it up initialStateRDD Initial value , Then it will be in initialStateRDD On this basis MapWithStateRDD, Otherwise, create an empty MapWithStateRDD, Be careful , The same one is used here partitioner To ensure the same partition
new MapWithStateRDD(
prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime)
override def compute(
partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {
val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
val prevStateRDDIterator = prevStateRDD.iterator(
stateRDDPartition.previousSessionRDDPartition, context)
val dataIterator = partitionedDataRDD.iterator(
stateRDDPartition.partitionedDataRDDPartition, context)
# prevRecord representative prevStateRDD Data of a partition
val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
val newRecord = MapWithStateRDDRecord.updateRecordWithData(
prevRecord,
dataIterator,
mappingFunction,
batchTime,
timeoutThresholdTime,
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
)
Iterator(newRecord)
}
MapWithStateRDDRecord.updateRecordWithData def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
dataIterator: Iterator[(K, V)],
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long],
removeTimedoutData: Boolean
): MapWithStateRDDRecord[K, S, E] = {
// from prevRecord Copy historical status data in ( Save in MapWithStateRDDRecord Of stateMap Properties of the )
val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }
// Save the data to be returned
val mappedData = new ArrayBuffer[E]
val wrappedState = new StateImpl[S]() // State Subclass , Represents a state
// Circular book batch All records (key-value)
dataIterator.foreach { case (key, value) =>
// according to key Get this from the historical status data key Corresponding historical status
wrappedState.wrap(newStateMap.get(key))
// take mappingFunction Apply to this batch Every record of
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
if (wrappedState.isRemoved) {
newStateMap.remove(key)
} else if (wrappedState.isUpdated
|| (wrappedState.exists && timeoutThresholdTime.isDefined)) {
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
}
mappedData ++= returned
} // Determine whether to delete 、 Update, etc., and then maintain newStateMap State in , And record the returned data mappedData
// If there is a configuration timeout , Some update operations will also be carried out
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}
// Still return one MapWithStateRDDRecord, Just the data inside has changed
MapWithStateRDDRecord(newStateMap, mappedData)
}
# word="hello",value=1,state=("hello",3)
val mappingFunc = (word: String, value: Option[Int], state: State[Int]) => {
val sum = value.getOrElse(0) + state.getOption.getOrElse(0) #sum=4
val output = (word, sum) #("hello",4)
state.update(sum) # to update key="hello" The status of is 4
output # return ("hello",4)
}
/** Companion object for [[StateMap]], with utility methods */
private[streaming] object StateMap {
def empty[K, S]: StateMap[K, S] = new EmptyStateMap[K, S]
def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
val deltaChainThreshold = conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
DELTA_CHAIN_LENGTH_THRESHOLD)
new OpenHashMapBasedStateMap[K, S](deltaChainThreshold)
}
}
org.apache.spark.util.collection.OpenHashMapReference resources
边栏推荐
- Google Earth Engine——MERRA-2 M2T1NXSLV:1980-至今全球压力、温度、风等数据集
- Kalibr calibration realsensed435i -- multi camera calibration
- DELTA控制器RMC200
- Google Earth engine - merra-2 m2t1nxaer: aerosol daily data set from 1980 to 2022
- 御神楽的学习记录之SoC FPGA的第一个工程-Hello World
- 13 years of senior developers share a year of learning rust experience: from the necessary bibliography to code practice
- 13年资深开发者分享一年学习Rust经历:从必备书目到代码练习一网打尽
- hawe螺旋插装式单向阀RK4
- TI C6000 TMS320C6678 DSP+ Zynq-7045的PS + PL异构多核案例开发手册(4)
- PS + PL heterogeneous multicore case development manual for Ti C6000 tms320c6678 DSP + zynq-7045 (4)
猜你喜欢

Google Earth Engine——MERRA-2 M2T1NXAER:1980-2022年气溶胶逐日数据集

ES6高级-查询商品案例

博途PLC顺序开关机功能块(SCL)

API 版本控制【 Eolink 翻译】

Zynq PS + PL heterogeneous multicore Case Development Manual of Ti C6000 tms320c6678 DSP + zynq-7045 (1)

ES6 advanced - query commodity cases
原来卡布奇诺信息安全协会是干这个的呀,一起来看看吧。

PS + PL heterogeneous multicore case development manual for Ti C6000 tms320c6678 DSP + zynq-7045 (3)
FTP protocol

SAP ABAP 守护进程的实现方式
随机推荐
深度学习中图像增强技术的综合综述
阿里云DMS MySQL云数据库建表报错,求解!!
Implementation of personalized healthy diet recommendation system based on SSM
初识OpenGL (4)链接着色器
Bucher gear pump qx81-400r301
Is it safe for CICC fortune to speculate in stocks? The securities company with the cheapest handling fee
parker电磁阀D1VW020DNYPZ5
.net get injection object manually
Refuse noise, the entry journey of earphone Xiaobai
机器人手眼标定Ax=xB(eye to hand和eye in hand)及平面九点法标定
Reflection, enumeration, and lambda expressions
VS2019Debug模式太卡进不去断点
js 对数组操作的 API 总结
Promise, async await and the solution of cross domain problems -- the principle of proxy server
parker泵PV140R1K1T1PMMC
Paper: all models are wrong, but many are useful: all models are wrong, but many are useful: understand the importance of variables by studying a whole class of prediction models at the same time
We were tossed all night by a Kong performance bug
Some cutting-edge research work sharing of SAP ABAP NetWeaver containerization
潘多拉 IOT 开发板学习(RT-Thread)—— 实验17 ESP8266 实验(学习笔记)
FTP协议