当前位置:网站首页>flink需求之—ProcessFunction(需求:如果30秒内温度连续上升就报警)
flink需求之—ProcessFunction(需求:如果30秒内温度连续上升就报警)
2022-07-26 22:40:00 【不会打球的摄影师不是好程序员】
1.ProcessFunction是flink中的大杀器(个人认为)
Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的
window 函数和转换算子无法实现)
Flink 提供了 8 个 Process Function:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
2.主要使用KeyedProcessFunction来完成该需求
1.首先介绍一下KeyedProcessFunction
1.KeyedProcessFunction 用来操作 KeyedStream(经过了keyby操作之后)。
KeyedProcessFunction 会处理流的每一个元素,输出为 0 个、1 个或者多个元素。所有的 Process Function 都继承自RichFunction 接口,所以都有 open()、close()和 getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:
(1)processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这个方法,调用结果将会放在 Collector 数据类型中输出。Context可以访问元素的时间戳,元素的 key,以及 TimerService 时间服务。Context还可以将结果输出到别的流(side outputs)。
(2)onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的时间戳。Collector 为输出结果的集合。OnTimerContext 和processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
2.TimerService 和 定时器(Timers)
(1)Context 和 OnTimerContext 所持有的 TimerService 对象拥有以下方法:
(2)currentProcessingTime(): Long 返回当前处理时间
(3)currentWatermark(): Long 返回当前 watermark 的时间戳
(4)registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前 key 的processing time 的定时器。当 processing time 到达定时时间时,触发 timer。
(5)registerEventTimeTimer(timestamp: Long): Unit 会注册当前 key 的 event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
(6)deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
(7) deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时
器,如果没有此时间戳的定时器,则不执行。
3.当定时器 timer 触发时,会执行回调函数 onTimer()。注意定时器 timer 只能在
keyed streams 上面使用。
2.需求的实现
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import java.util.Properties
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "hadoop101:9092")
val data = env.addSource(new FlinkKafkaConsumer[String]("sensor",new SimpleStringSchema(),properties))
val dataStream = data.map(x => {
val arr = x.split(",")
SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
})
//连续30秒内温度连续上升就报警
val warningStream = dataStream
.keyBy(_.id)
.process(new TempIncreWarning(30000L) )
warningStream.print("warningStream")
env.execute()
}
}
class TempIncreWarning(interval:Long) extends KeyedProcessFunction[String,SensorReading,String] {
//定义状态,保存上一个温度值进行比较,保存注册定时器的时戳用于删除
lazy val lastTempState : ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("valueState",classOf[Double]))
lazy val timerTsState : ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timeState",classOf[Long]))
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
//先取出状态
val lastTemp = lastTempState.value()
val timerTs = timerTsState.value()
lastTempState.update(value.temperature)
//当前温度与上次温度进行对比
if (value.temperature > lastTemp && timerTs == 0){
//如果温度上升并且没有定时器,那么注册当前数据时间戳10s之后的定时器
val ts = ctx.timerService().currentProcessingTime() + interval
ctx.timerService().registerProcessingTimeTimer(ts)
timerTsState.update(ts)
}else if(value.temperature < lastTemp){
ctx.timerService().deleteProcessingTimeTimer(timerTs)
timerTsState.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("传感器" + ctx.getCurrentKey + "的温度连续" + interval/1000 + "秒连续上升" )
timerTsState.clear()
}
}
3.结果展示
输入条件:
>"sensor_1", 1547718199, 35.5
>"sensor_1", 1547718199, 35.6
>"sensor_1", 1547718199, 35.7
>"sensor_1", 1547718199, 35.8
>"sensor_1", 1547718199, 35.9
>"sensor_1", 1547718199, 36.1
>"sensor_1", 1547718199, 36.2
>"sensor_1", 1547718199, 36.3
>"sensor_1", 1547718199, 36.4
>"sensor_1", 1547718199, 36.5
>
输出结果:
边栏推荐
- The detailed process of reinstalling AutoCAD after uninstallation and deleting the registry
- Ansible MySQL installation case record
- JSCORE day_04(7.5)
- [HFCTF2020]EasyLogin
- 2022.DAY600
- Promise基本用法 20211130
- SSRF explanation and burp automatic detection SSRF
- Vector size performance problems
- Flink1.11 Jdcb方式写mysql测试用例
- Medical data of more than 4000 people has been exposed for 16 years
猜你喜欢

Point to plane projection
![[CTF 真题] 2018-网鼎杯-Web-Unfinish](/img/d8/a367c26b51d9dbaf53bf4fe2a13917.png)
[CTF 真题] 2018-网鼎杯-Web-Unfinish
![[CISCN2019 总决赛 Day2 Web1]Easyweb](/img/36/1ca4b6cae4e0dda0916b511d4bcd9f.png)
[CISCN2019 总决赛 Day2 Web1]Easyweb

JSCORE day_01(6.30) RegExp 、 Function
![[hongminggu CTF 2021] write_ shell](/img/f5/c3a771ab7b40311e37a056defcbd78.png)
[hongminggu CTF 2021] write_ shell
![[interview: concurrent Article 16: multithreading: detailed explanation of wait/notify] principle and wrong usage (false wake-up, etc.)](/img/23/7af903e73e8990459f276b713beec9.png)
[interview: concurrent Article 16: multithreading: detailed explanation of wait/notify] principle and wrong usage (false wake-up, etc.)
![[BJDCTF2020]EzPHP](/img/be/a48a1a9147f1f3b21ef2d60fc1f59f.png)
[BJDCTF2020]EzPHP

DOM day_03(7.11) 事件冒泡机制、事件委托、待办事项、阻止默认事件、鼠标坐标、页面滚动事件、创建DOM元素、DOM封装操作

DOM day_ 04 (7.12) BOM, open new page (delayed opening), address bar operation, browser information reading, historical operation

深入理解Golang - 闭包
随机推荐
DOM day_02(7.8)网页制作流程、图片src属性、轮播图、自定义属性、标签栏、输入框事件、勾选操作、访问器语法
通过FlinkCDC将MySQL中变更的数据写入到kafka(DataStream方式)
[By Pass] 文件上传的绕过方式
[CISCN2019 华东南赛区]Double Secret
Install redis-7.0.4 in Linux system
[ciscn2019 North China Day1 web5] cyberpunk
MySQL8.0中的隐藏索引和降序索引(新特性)
[BJDCTF2020]EzPHP
07 - 日志服务器的搭建与攻击
[NPUCTF2020]ezinclude
Only hard work, hard work and hard work are the only way out C - patient entity class
Use csrftester to automatically detect CSRF vulnerabilities
el-checkbox中的checked勾选状态问题 2021-08-02
细说 call、apply 以及 bind 的区别和用法 20211031
Doris或StarRocks Jmeter压测
Point to plane projection
Valueerror: the device should not be 'GPU', since paddepaddle is not compiled with CUDA
[NCTF2019]SQLi
Yolo of Darknet_ Forward of layer_ yolo_ Layer comments
ES6中的export和import