当前位置:网站首页>关于Flink框架窗口(window)函数最全解析
关于Flink框架窗口(window)函数最全解析
2022-07-02 10:34:00 【InfoQ】
概述

窗口类型
时间窗口(Time Window)
计数窗口(Count Window)
- 以时间窗口为例(计数窗口类似),滚动窗口就是按照固定的时间间隔将数据进行切分。
- 特点就是时间比较对齐、窗口的长度都是固定的且没有重叠。

- 以时间窗口为例(计数窗口类似),滑动窗口是固定窗口的另一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
- 窗口长度是固定的,窗口之间是可以重叠的。

- 会话敞口只存在于时间窗口,计数窗口无会话窗口。
- 特点是时间无对齐

Window API使用
.window()
window ()
keyBy
window()
WindowAssigner
WindowAssigner
window
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//开窗测试 指定窗口分配器
DataStream<Integer> resultStream = dataStream.keyBy("id")
//设置一个15秒的一个滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
//会话窗口
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
//滑动窗口
//.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
env.execute();
}
}
.timeWindow
.countWindow
Time.milliseconds(x)
Time.seconds(x)
Time.minutes(x)
.timeWindow(Time.seconds(15))
.timeWindow(Time.seconds(15),Time.seconds(5))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
.countWindow(10)
.countWindow(10,2)
- 增量聚合函数:每条数据到来就进行计算,先保持着一个状态,聚合函数有
ReduceFunction
AggregateFunction
。
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//开窗测试 指定窗口分配器
DataStream<Integer> resultStream = dataStream.keyBy("id")
//对窗口进行聚合操作 增量窗口操作
.timeWindow(Time.seconds(15))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
@Override
//创建累加器
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(SensorReading sensorReading, Integer accumulator) {
return accumulator+1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
});
resultStream.print();
env.execute();
}
}

- 全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。对应的函数:
ProcessWindowFunction
,WindowFunction

public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//全量窗口函数
DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
String id =tuple.getField(0);
Long windowEnd =window.getEnd();
Integer count = IteratorUtils.toList(input.iterator()).size();
out.collect(new Tuple3<>(id,windowEnd,count));
}
});
resultStream.print();
env.execute();
}
}

- 触发器:
.trigger()
定义 window 什么时候关闭,触发计算并输出结果
- 移除器:
.evitor()
定义移除某些数据的逻辑
.allowedLateness()
允许处理迟到的数据
.sideOutputLateData()
将迟到的数据放入侧输出流
.getSideOutput()
获取侧输出流
public class WindowTest2_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
// 开计数窗口测试
DataStream<Double> resultStream = dataStream.keyBy("id")
.countWindow(10, 2)
.aggregate(new MyAvgTemp());
//其他可选API 对迟到数据的处理方式
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
//.trigger()
//.evictor()
.allowedLateness(Time.minutes(1))
//输出到测流
.sideOutputLateData(outputTag)
.sum("temperature");
sumStream.getSideOutput(outputTag).print("late");
resultStream.print();
env.execute();
}
public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
@Override
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
}
}
}

- 在 flink中我们定义一个window,必须在 keyBy操作之后。
- 窗口函数之后一定要有聚合操作。
边栏推荐
- MySQL 45 lecture - learning from the actual battle of geek time MySQL 45 Lecture Notes - 04 | easy to understand index (Part 1)
- [technology development-22]: rapid overview of the application and development of network and communication technology-2-communication Technology
- Subcontracting configuration of uniapp applet subpackages
- [Blue Bridge Cup] children's worship circle
- Dingtalk 发送消息
- 【模板】最长公共子序列 (【DP or 贪心】板子)
- Launcher启动过程
- In 2021, the global TCB adapter revenue was about $93 million, and it is expected to reach $315.5 million in 2028
- QT how to set fixed size
- Do you know that there is an upper limit on the size of Oracle data files?
猜你喜欢
Selenium, element operation and browser operation methods
Use of swagger
[deep learning] simple implementation of neural network forward propagation
rxjs Observable 自定义 Operator 的开发技巧
Sum of the first n terms of Fibonacci (fast power of matrix)
【文档树、设置】字体变小
Systemserver process
万物生长大会在杭召开,当贝入选2022中国未来独角兽TOP100榜单
Student course selection information management system based on ssm+jsp framework [source code + database]
Penetrate the remote connection database through the Intranet
随机推荐
Just 1000 fans, record it
Qt原代码基本知识
P1347 sorting (topology + SPFA judgment ring or topology [inner judgment ring])
Student course selection information management system based on ssm+jsp framework [source code + database]
Three talking about exception -- error handling
自定义事件,全局事件总线,消息订阅与发布,$nextTick
给Android程序员的一些面试建议「建议收藏」
[document tree, setting] font becomes smaller
Dingtalk send message
万物生长大会在杭召开,当贝入选2022中国未来独角兽TOP100榜单
Find love for speed in F1 delta time Grand Prix
每天坚持20分钟go的基础二
On flow delivery between microservices
【文档树、设置】字体变小
每日学习3
C crystal report printing
Runhe hi3516 development board openharmony small system and standard system burning
Characteristics of selenium
Start to write a small demo - three piece chess
MySQL 45 lecture - learning the actual battle of MySQL in Geek time 45 Lecture Notes - 05 | easy to understand index (Part 2)