当前位置:网站首页>Flink时间和窗口
Flink时间和窗口
2022-08-02 14:05:00 【大学生爱编程】
代码完成,本地(提交任务的地方)构建数据流程图,将图提交给jobManager并拆分多个task,进行任务调度
不需要依赖任何的框架,独立运行
1.上传解压修改环境变量
设置flink任务的并行度,在代码中设置,在提交任务时设置(-p 加上设置的并行度)(源码优先级高),但是socket的并行度只能是1
env.setParallelism(2)
一个并行度占用一个资源槽,和task无关,task可以共享资源
可以对每一个算子设置名字 ,id,和并行度
val kvDS=wordsDS.map((_,1))
.setParallelism(3) //当前算子的并行度
.name("转换成kv格式")
.uid("3") //唯一标识
1. Time
1.1 processing 数据的处理时间
滑动的处理时间窗口
val linesDS: DataStream[String] =env.socketTextStream("master",8888)
val wordsDS: DataStream[String] =linesDS.flatMap(line=>line.split(","))
val kvDS: DataStream[(String, Int)] =wordsDS.map(word=>(word,1))
val keyByDS: KeyedStream[(String, Int), String] =kvDS.keyBy(kv=>kv._1)
//窗口的大小是最近10秒 窗口内的数据任然是当初调用生成窗口的DS的类型,所以直接进行统计
//处理时间,就是现实时间
val windowsDS: WindowedStream[(String, Int), String, TimeWindow] =keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
val countDS: DataStream[(String, Int)] =windowsDS.sum(1)
countDS.print()
env.execute()
1.2 event time 事件时间
当时间顺序错乱时,会出现大问题(报错)
滑动的事件事件窗口
数据发生的时间,会附带一个时间字段,比如车辆经过道路的时间戳
kcDS.assignAscendingTimestamps(kv=>kv._2) 告诉flink哪一个字段是事件时间
val windowDS: WindowedStream[(String, Int), String, TimeWindow]
=keyByEdDS.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))) 指定窗口类型
object Demo3Event {
def main(args: Array[String]): Unit = {
/**
* 事件时间
* 利用数据中自带的时间字段进行统计
*/
val env: StreamExecutionEnvironment =StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataDS: DataStream[String] =env.socketTextStream("master",8888)
val kvDS: DataStream[(String, Long)] =dataDS.map(line=>{
val split: Array[String] =line.split(",")
val roadId: String =split(1) //道路编号
val ts: Long =split(2).toLong //时间戳 字符串转long
(roadId,ts)
})
/**
* 告诉flink程序哪一个时间是时间字段
* 指定好下面就不需要该字段了
*/
val assDS: DataStream[(String, Long)] =kvDS.assignAscendingTimestamps(kv=>kv._2)
val roadKvDS: DataStream[(String, Int)] =assDS.map(kv=>(kv._1,1))
val keyByEdDS: KeyedStream[(String, Int), String] =roadKvDS.keyBy(_._1)
/**
* 指定窗口
* 滑动的事件时间窗口 统计每个道路的车流量,每隔5秒统计一次,统计最近10秒的车辆
*/
val windowDS: WindowedStream[(String, Int), String, TimeWindow]
=keyByEdDS.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
val countDS: DataStream[(String, Int)] =windowDS.sum(1)
/**
* 指定时间字段和窗口,其余的都是正常计算
*/
countDS.print()
env.execute()
}
}
1.3 Watermark(引出一个问题)
基于事件时间的滚动窗口
窗口触发的条件:水位线大于等于窗口结束时间,且窗口内有数据,会触发前一个窗口的计算
.......指定时间字段
val roadKvDS: DataStream[(String, Int)] =assDS.map(kv=>(kv._1,1))
val keyByEdDS: KeyedStream[(String, Int), String] =roadKvDS.keyBy(_._1)
//每隔5秒产生一个窗口,然后进行计算
val windowDS: WindowedStream[(String, Int), String, TimeWindow]
=keyByEdDS.window(TumblingEventTimeWindows.of(Time.seconds(5)))
val countDS: DataStream[(String, Int)] =windowDS.sum(1)
问题:时间乱序产生数据丢失引出水位线
水位线的概念:默认为最新的数据的时间戳,水位线只能增加不能减少
1.4 水位线的前移:
水位线的前移:窗口计算时间延迟(延迟根据乱序程度设置),相当于时间减x,等一会再触发,等一部分乱序的数据过来了再触发(指定最大数据乱序时间),数据是含头不含尾的
//设置水位线的生成策略,前移5秒
val assDS: DataStream[(String, Long)] =kvDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
//设置时间字段
.withTimestampAssigner(new SerializableTimestampAssigner[(String,Long)] {
override def extractTimestamp(t: (String, Long), l: Long): Long = {
t._2
}
})
)
val roadKvDS: DataStream[(String, Int)] =assDS.map(kv=>(kv._1,1))
val keyByEdDS: KeyedStream[(String, Int), String] =roadKvDS.keyBy(_._1)
/**
* 指定窗口
* 滑动的事件时间窗口 统计每个道路的车流量,每隔5秒统计一次,统计最近10秒的车辆
*/
val windowDS: WindowedStream[(String, Int), String, TimeWindow]
=keyByEdDS.window(TumblingEventTimeWindows.of(Time.seconds(5)))
val countDS: DataStream[(String, Int)] =windowDS.sum(1)
/**
* 指定时间字段和窗口,其余的都是正常计算
*/
CheckPoint
State
Window
ingestion 数据到达flink端的接收时间
2.Windows(3大类8小类)
2.1Time window 时间窗口
时间窗口,固定时间就计算:
SlidingEventTimeWindows 滑动的事件时间窗口
SlidingProcessingTimeWindows 滑动的处理时间窗口
TumblingEventTimeWindows 滚动的事件时间窗口
TumblingProcessingTimeWindows 滚动的处理时间窗口
滑动 窗口存在交叉部分
滚动 窗口没有交叉
事件时间 数据自带一个时间字段 需要设置时间字段和水位线
处理时间 数据被处理的时间
2.2 Count Window 统计窗口
当一个单词达到十条的时候进行计算
滚动窗口:(没有交叉)
val countWindowDS: WindowedStream[(String, Int), String, GlobalWindow] =keyByDS.countWindow(10)
countWindowDS.sum(1).print()
滑动窗口:(有交叉),参数加一个即可
每产生5条数据,计算最近10条的数据
val countWindowDS: WindowedStream[(String, Int), String, GlobalWindow] =keyByDS.countWindow(10,5)
2.3 Session Window 会话窗口
处理时间的会话窗口:
如果一段时间(现实时间)同一个key没有数据则生成窗口对前面数据进行计算
val windows: WindowedStream[(String, Int), String, TimeWindow] =keyByEdDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
事件时间的会话窗口: 指定时间字段
事件之间的时间相差过5秒才会触发上一窗口的计算
val windows: WindowedStream[(String, Int), String, TimeWindow] =
keyByEdDS.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
边栏推荐
猜你喜欢
What's wrong with running yolov5 (1) p, r, map are all 0
Deep learning framework pytorch rapid development and actual combat chapter4
Deep learning framework pytorch rapid development and actual combat chapter3
重新学习编程day1 【初始c语言】【c语言编写出计算两个数之和的代码】
主存储器(二)
宏定义问题记录day2
c语言三子棋详解!!! (电脑智能下棋)(附上完整代码)
十分钟带你入门Nodejs
VS Code远程开发及免密配置
MySQL知识总结 (八) InnoDB的MVCC实现机制
随机推荐
C语言字符串——关于指针
Linux: CentOS 7 install MySQL5.7
uni-app页面、组件视图数据无法刷新问题的解决办法
浏览器报错数字代表的大概意思
宝塔面板搭建小说CMS管理系统源码实测 - ThinkPHP6.0
【c】小游戏---五子棋之井字棋雏形
鼠标右键菜单栏太长如何减少
Programming Specifications - LiteOS
MongoDB安装流程心得:
十分钟带你入门Nodejs
binlog与iptables防nmap扫描
[ROS] (02) Create & compile ROS package Package
C语言初级—水仙花数
加减法运算及其溢出处理
主存储器(二)
yolov5,yolov4,yolov3 mess
A little thought about password encryption
[ROS] Introduction to common tools in ROS (to be continued)
(ROS) (03) CMakeLists. TXT, rounding
宝塔搭建DM企业建站系统源码实测