当前位置:网站首页>The most complete analysis of Flink frame window function
The most complete analysis of Flink frame window function
2022-07-02 14:12:00 【InfoQ】

Window type
Time window (Time Window)
Count window (Count Window)
- Take the time window as an example ( The counting window is similar to ), Scrolling window is to segment data at fixed time intervals .
- The characteristic is that the time is relatively aligned 、 The length of windows is fixed and there is no overlap .

- Take the time window as an example ( The counting window is similar to ), Sliding window is another form of fixed window , Sliding window consists of fixed window length and sliding interval .
- The window length is fixed , Windows can overlap .

- Session exposure only exists in the time window , Count window no session window .
- It is characterized by no alignment of time

Window API Use
window ()
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// Windowing test Specify the window allocator
DataStream<Integer> resultStream = dataStream.keyBy("id")
// Set up a 15 A scrolling window in seconds
// Session window
// The sliding window
- Incremental aggregate function : Each piece of data comes and is calculated , Keep a state first , Aggregate functions have
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// Windowing test Specify the window allocator
DataStream<Integer> resultStream = dataStream.keyBy("id")
// Aggregate the window Incremental window operation
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
// Create accumulators
public Integer createAccumulator() {
return 0;
public Integer add(SensorReading sensorReading, Integer accumulator) {
return accumulator+1;
public Integer getResult(Integer accumulator) {
return accumulator;
public Integer merge(Integer integer, Integer acc1) {
return null;

- Full window functions : First collect all the data in the window , When it's time to compute, it's going to traverse all the data . Corresponding function :

public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// Full window function
DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
.apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
String id =tuple.getField(0);
Long windowEnd =window.getEnd();
Integer count = IteratorUtils.toList(input.iterator()).size();
out.collect(new Tuple3<>(id,windowEnd,count));

- trigger :
Definition window When to close , Trigger the calculation and output the result
- Remover :
Define the logic for removing some data
Allow processing of late data
Put the late data into the side output stream
Get side output stream
public class WindowTest2_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
// Open counting window test
DataStream<Double> resultStream = dataStream.keyBy("id")
.countWindow(10, 2)
.aggregate(new MyAvgTemp());
// Other options API Processing method of late data
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
// Output to flow measurement
public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);

- stay flink In, we define a window, Must be in keyBy After the operation .
- After the window function, there must be aggregation operation .
- [development environment] Dell computer system reinstallation (download Dell OS recovery tool | use Dell OS recovery tool to make USB flash disk system | install system)
- uniapp小程序 subPackages分包配置
- Essential elements of science fiction 3D scenes - City
- [template] longest common subsequence ([DP or greedy] board)
- 自定义事件,全局事件总线,消息订阅与发布,$nextTick
- Systemserver process
- Data Lake (11): Iceberg table data organization and query
- Characteristics of selenium
- SystemServer进程
- Golang quickly generates model and queryset of database tables
QT new project
MySQL45讲——学习极客时间MySQL实战45讲笔记—— 05 | 深入浅出索引(下)
Selenium, element operation and browser operation methods
Will your sleep service dream of the extra bookinfo on the service network
In 2021, the global styrene butadiene styrene (SBS) revenue was about $3722.7 million, and it is expected to reach $5679.6 million in 2028
千元投影小明Q1 Pro和极米NEW Play谁更好?和哈趣K1比哪款配置更高?
Use bloc to build a page instance of shutter
Golang 快速生成数据库表的 model 和 queryset
Characteristics of selenium
Use of UIC in QT
Everyone believes that the one-stop credit platform makes the credit scenario "useful"
selenium 在pycharm中安装selenium
[Hongke technology sharing] how to test DNS server: DNS performance and response time test
P3008 [USACO11JAN]Roads and Planes G (SPFA + SLF优化)
Federated Search: all requirements in search
线性dp求解 最长子序列 —— 小题三则
Thymeleaf dependency
Data consistency between redis and database
Analysis of CPU surge in production environment service
Quarkus学习四 - 项目开发到部署
docker mysql