当前位置:网站首页>Mapwithstate of spark streaming state flow
Mapwithstate of spark streaming state flow
2022-07-26 16:06:00 【InfoQ】
background
updateStateByKey mapWithStatemapWithStateWorkflow
mapWithStateupdateStateBykey
mapWithStatemapWithStateSpecific classes and relationships
MapWithStateStreamImpl,InternalMapWithStateStream,MapWithStateRDD,MapWithStateRDDRecordmapWithState
Source code analysis
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("d:\\tmp")
val params = Map("bootstrap.servers" -> "master:9092", "group.id" -> "scala-stream-group")
val topic = Set("test")
val initialRDD = ssc.sparkContext.parallelize(List[(String, Int)]())
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, topic)
val word = messages.flatMap(_._2.split(" ")).map { x => (x, 1) }
// Customize mappingFunction, Add up the number of times the word appears and update the status
val mappingFunc = (word: String, count: Option[Int], state: State[Int]) => {
val sum = count.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
// call mapWithState Manage the state of the stream data
val stateDstream = word.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)).print()
ssc.start()
ssc.awaitTermination()
mapWithStatePairDStreamFunctionstoPairDStreamFunctionsmapWithState implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
PairDStreamFunctions[K, V] = {
new PairDStreamFunctions[K, V](stream)
}
PairDStreamFunctions.mapWithState()@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self, # This is this batch DStream object
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}
mapWithStateStateSpecStateSpecmapWithStatemapWithStateprivate val internalStream =
new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)
......
# This method just converts the data that needs to be returned , The main logic is internalStream Of computer Inside
override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}
MapWithStateDStreamImplinternalStream.getOrComputeDStream.getOrComputeInternalMapWithStateDStream.computeInternalMapWithStateDStream: line 132
/** Method that generates a RDD for the given time */
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
// Calculate the last state RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match {
case Some(rdd) =>
// This rdd yes RDD[MapWithStateRDDRecord[K, S, E]] type , It's really just a MapWithStateRDD
if (rdd.partitioner != Some(partitioner)) {
// If the RDD is not partitioned the right way, let us repartition it using the
// partition index as the key. This is to ensure that state RDD is always partitioned
// before creating another state RDD using it
// _.stateMap.getAll() Is to get all the status data ( Look down at the meeting and you will understand ), On this basis, create MapWithStateRDD
MapWithStateRDD.createFromRDD[K, V, S, E](
rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
} else {
rdd
}
case None =>
// The first batch of data will enter this branch
MapWithStateRDD.createFromPairRDD[K, V, S, E](
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
partitioner,
validTime
)
}
// This batch of stream data consists of RDD:dataRDD
val dataRDD = parent.getOrCompute(validTime).getOrElse {
context.sparkContext.emptyRDD[(K, V)]
}
val partitionedDataRDD = dataRDD.partitionBy(partitioner) // Repartition
val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
(validTime - interval).milliseconds
}
// structure MapWithStateRDD
Some(new MapWithStateRDD(
prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
}
}
- If you can get results MapWithStateRDD
- Judge that RDD Whether it has been partitioned correctly , The later calculation is to calculate the old state and the new incoming data corresponding to each partition , If the partitions of the two are inconsistent , Will cause calculation errors ( It will be updated to the wrong state ). If the partition is correct , Then go straight back RDD, Otherwise, by
MapWithStateRDD.createFromRDDAppoint partitioner Repartition of data
- If the calculation cannot get the result MapWithStateRDD
- adopt
MapWithStateRDD.createFromPairRDDCreate a , If in WordCount There are pairs in the code StateSpec Set it up initialStateRDD Initial value , Then it will be in initialStateRDD On this basis MapWithStateRDD, Otherwise, create an empty MapWithStateRDD, Be careful , The same one is used here partitioner To ensure the same partition
new MapWithStateRDD(
prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime)
override def compute(
partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {
val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
val prevStateRDDIterator = prevStateRDD.iterator(
stateRDDPartition.previousSessionRDDPartition, context)
val dataIterator = partitionedDataRDD.iterator(
stateRDDPartition.partitionedDataRDDPartition, context)
# prevRecord representative prevStateRDD Data of a partition
val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
val newRecord = MapWithStateRDDRecord.updateRecordWithData(
prevRecord,
dataIterator,
mappingFunction,
batchTime,
timeoutThresholdTime,
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
)
Iterator(newRecord)
}
MapWithStateRDDRecord.updateRecordWithData def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
dataIterator: Iterator[(K, V)],
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long],
removeTimedoutData: Boolean
): MapWithStateRDDRecord[K, S, E] = {
// from prevRecord Copy historical status data in ( Save in MapWithStateRDDRecord Of stateMap Properties of the )
val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }
// Save the data to be returned
val mappedData = new ArrayBuffer[E]
val wrappedState = new StateImpl[S]() // State Subclass , Represents a state
// Circular book batch All records (key-value)
dataIterator.foreach { case (key, value) =>
// according to key Get this from the historical status data key Corresponding historical status
wrappedState.wrap(newStateMap.get(key))
// take mappingFunction Apply to this batch Every record of
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
if (wrappedState.isRemoved) {
newStateMap.remove(key)
} else if (wrappedState.isUpdated
|| (wrappedState.exists && timeoutThresholdTime.isDefined)) {
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
}
mappedData ++= returned
} // Determine whether to delete 、 Update, etc., and then maintain newStateMap State in , And record the returned data mappedData
// If there is a configuration timeout , Some update operations will also be carried out
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}
// Still return one MapWithStateRDDRecord, Just the data inside has changed
MapWithStateRDDRecord(newStateMap, mappedData)
}
# word="hello",value=1,state=("hello",3)
val mappingFunc = (word: String, value: Option[Int], state: State[Int]) => {
val sum = value.getOrElse(0) + state.getOption.getOrElse(0) #sum=4
val output = (word, sum) #("hello",4)
state.update(sum) # to update key="hello" The status of is 4
output # return ("hello",4)
}
/** Companion object for [[StateMap]], with utility methods */
private[streaming] object StateMap {
def empty[K, S]: StateMap[K, S] = new EmptyStateMap[K, S]
def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
val deltaChainThreshold = conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
DELTA_CHAIN_LENGTH_THRESHOLD)
new OpenHashMapBasedStateMap[K, S](deltaChainThreshold)
}
}
org.apache.spark.util.collection.OpenHashMapReference resources
边栏推荐
- 一文详解 Redis 中 BigKey、HotKey 的发现与处理
- 初识OpenGL (4)链接着色器
- parker电磁阀D1VW020DNYPZ5
- Development and implementation of campus epidemic prevention and control management system based on SSM
- 原来卡布奇诺信息安全协会是干这个的呀,一起来看看吧。
- 一文搞懂│XSS攻击、SQL注入、CSRF攻击、DDOS攻击、DNS劫持
- spark-streaming状态流之mapWithState
- Basic specification of component development, localstorage and sessionstorage, object data to basic value, prototype chain use
- German EMG e-anji thruster ed301/6 HS
- Development daily summary (11): file upload function improvement: Chinese character detection and text content processing
猜你喜欢

Google Earth Engine——MERRA-2 M2T1NXSLV:1980-至今全球压力、温度、风等数据集
.NET 手动获取注入对象
![API version control [eolink translation]](/img/3a/8a78e57a2474f33d011d91631fde74.jpg)
API version control [eolink translation]

潘多拉 IOT 开发板学习(RT-Thread)—— 实验17 ESP8266 实验(学习笔记)

如何通过ETL调度工具 TASKCTL 使用作业插件类型调用 kettle作业?

2022 what is your sense of security? Volvo asked in the middle of the year
FTP协议

2022你的安全感是什么?沃尔沃年中问道

bucher齿轮泵QX81-400R301

认识JS基础与浏览器引擎
随机推荐
阿里巴巴一面 :十道经典面试题解析
教大模型自己跳过“无用”层,推理速度×3性能不变,谷歌MIT这个新方法火了...
Is it safe for Guoyuan futures to open an account online? What is the account opening process?
.NET 手动获取注入对象
邻接矩阵的COO格式
Clojure Web 开发-- Ring 使用指南
Coo format of adjacency matrix
单例模式
使用verdaccio搭建自己的npm私有库
Clojure 运行原理之字节码生成篇
基于NoCode构建简历编辑器
API version control [eolink translation]
Chapter 7 supporting CORS in rest services
2022 what is your sense of security? Volvo asked in the middle of the year
德国emg电动执行器EB800-60II
【工具分享】自动生成文件目录结构工具mddir
13 years of senior developers share a year of learning rust experience: from the necessary bibliography to code practice
ES6高级-查询商品案例
Gcc/g++ and dynamic and static libraries and GDB
Paper: all models are wrong, but many are useful: all models are wrong, but many are useful: understand the importance of variables by studying a whole class of prediction models at the same time