当前位置:网站首页>关于Flink框架窗口(window)函数最全解析
关于Flink框架窗口(window)函数最全解析
2022-07-02 10:34:00 【InfoQ】
概述

窗口类型
时间窗口(Time Window)
计数窗口(Count Window)
- 以时间窗口为例(计数窗口类似),滚动窗口就是按照固定的时间间隔将数据进行切分。
- 特点就是时间比较对齐、窗口的长度都是固定的且没有重叠。

- 以时间窗口为例(计数窗口类似),滑动窗口是固定窗口的另一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
- 窗口长度是固定的,窗口之间是可以重叠的。

- 会话敞口只存在于时间窗口,计数窗口无会话窗口。
- 特点是时间无对齐

Window API使用
.window()
window ()
keyBy
window()
WindowAssigner
WindowAssigner
window
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//开窗测试 指定窗口分配器
DataStream<Integer> resultStream = dataStream.keyBy("id")
//设置一个15秒的一个滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
//会话窗口
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
//滑动窗口
//.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
env.execute();
}
}
.timeWindow
.countWindow
Time.milliseconds(x)
Time.seconds(x)
Time.minutes(x)
.timeWindow(Time.seconds(15))
.timeWindow(Time.seconds(15),Time.seconds(5))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
.countWindow(10)
.countWindow(10,2)
- 增量聚合函数:每条数据到来就进行计算,先保持着一个状态,聚合函数有
ReduceFunction
AggregateFunction
。
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//开窗测试 指定窗口分配器
DataStream<Integer> resultStream = dataStream.keyBy("id")
//对窗口进行聚合操作 增量窗口操作
.timeWindow(Time.seconds(15))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
@Override
//创建累加器
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(SensorReading sensorReading, Integer accumulator) {
return accumulator+1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
});
resultStream.print();
env.execute();
}
}

- 全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。对应的函数:
ProcessWindowFunction
,WindowFunction

public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//全量窗口函数
DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
String id =tuple.getField(0);
Long windowEnd =window.getEnd();
Integer count = IteratorUtils.toList(input.iterator()).size();
out.collect(new Tuple3<>(id,windowEnd,count));
}
});
resultStream.print();
env.execute();
}
}

- 触发器:
.trigger()
定义 window 什么时候关闭,触发计算并输出结果
- 移除器:
.evitor()
定义移除某些数据的逻辑
.allowedLateness()
允许处理迟到的数据
.sideOutputLateData()
将迟到的数据放入侧输出流
.getSideOutput()
获取侧输出流
public class WindowTest2_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
// 开计数窗口测试
DataStream<Double> resultStream = dataStream.keyBy("id")
.countWindow(10, 2)
.aggregate(new MyAvgTemp());
//其他可选API 对迟到数据的处理方式
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
//.trigger()
//.evictor()
.allowedLateness(Time.minutes(1))
//输出到测流
.sideOutputLateData(outputTag)
.sum("temperature");
sumStream.getSideOutput(outputTag).print("late");
resultStream.print();
env.execute();
}
public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
@Override
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
}
}
}

- 在 flink中我们定义一个window,必须在 keyBy操作之后。
- 窗口函数之后一定要有聚合操作。
边栏推荐
- kaggle如何使用utility script
- 软件测试的方法
- Use of swagger
- Everyone believes that the one-stop credit platform makes the credit scenario "useful"
- Qt如何设置固定大小
- 浏览器驱动的下载
- Student course selection information management system based on ssm+jsp framework [source code + database]
- Daily practice of C language --- monkeys divide peaches
- Multi rotor aircraft control using PID and LQR controllers
- 瀏覽器驅動的下載
猜你喜欢
Launcher启动过程
QT new project_ MyNotepad++
The global special paper revenue in 2021 was about $27 million, and it is expected to reach $35 million in 2028. From 2022 to 2028, the CAGR was 3.8%
rxjs Observable 自定义 Operator 的开发技巧
Memory management 01 - link script
(POJ - 1984) navigation nightare (weighted and search set)
The 29 year old programmer in Shanghai was sentenced to 10 months for "deleting the database and running away" on the day of his resignation!
无主灯设计:如何让智能照明更加「智能」?
BeanUtils--浅拷贝--实例/原理
Solve "sub number integer", "jump happily", "turn on the light"
随机推荐
Default slot, named slot, scope slot
Three talking about exception -- error handling
Multi rotor aircraft control using PID and LQR controllers
[development environment] 010 editor tool (tool download | binary file analysis template template installation | shortcut key viewing and setting)
【模板】最长公共子序列 (【DP or 贪心】板子)
SystemServer进程
Design of non main lamp: how to make intelligent lighting more "intelligent"?
路由(二)
Qt入门-制作一个简易的计算器
Just 1000 fans, record it
Runhe hi3516 development board openharmony small system and standard system burning
ArrayList and LinkedList
Find love for speed in F1 delta time Grand Prix
Will your sleep service dream of the extra bookinfo on the service network
mysql ---- Oracle中的rownum转换成MySQL
故事点 vs. 人天
On flow delivery between microservices
P1042 [NOIP2003 普及组] 乒乓球
Unity small map production [2]
Pattern matching and regular expressions in PostgreSQL - Das