当前位置:网站首页>The most complete analysis of Flink frame window function
The most complete analysis of Flink frame window function
2022-07-02 14:12:00 【InfoQ】
summary

Window type
Time window (Time Window) Count window (Count Window)- Take the time window as an example ( The counting window is similar to ), Scrolling window is to segment data at fixed time intervals .
- The characteristic is that the time is relatively aligned 、 The length of windows is fixed and there is no overlap .

- Take the time window as an example ( The counting window is similar to ), Sliding window is another form of fixed window , Sliding window consists of fixed window length and sliding interval .
- The window length is fixed , Windows can overlap .

- Session exposure only exists in the time window , Count window no session window .
- It is characterized by no alignment of time

Window API Use
.window()window ()keyBywindow()WindowAssignerWindowAssignerwindowpublic class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Windowing test Specify the window allocator
DataStream<Integer> resultStream = dataStream.keyBy("id")
// Set up a 15 A scrolling window in seconds
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
// Session window
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
// The sliding window
//.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
env.execute();
}
}
.timeWindow.countWindowTime.milliseconds(x)Time.seconds(x)Time.minutes(x).timeWindow(Time.seconds(15))
.timeWindow(Time.seconds(15),Time.seconds(5))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
.countWindow(10)
.countWindow(10,2)
- Incremental aggregate function : Each piece of data comes and is calculated , Keep a state first , Aggregate functions have
ReduceFunctionAggregateFunction.
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Windowing test Specify the window allocator
DataStream<Integer> resultStream = dataStream.keyBy("id")
// Aggregate the window Incremental window operation
.timeWindow(Time.seconds(15))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
@Override
// Create accumulators
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(SensorReading sensorReading, Integer accumulator) {
return accumulator+1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
});
resultStream.print();
env.execute();
}
}

- Full window functions : First collect all the data in the window , When it's time to compute, it's going to traverse all the data . Corresponding function :
ProcessWindowFunction,WindowFunction

public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Full window function
DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
String id =tuple.getField(0);
Long windowEnd =window.getEnd();
Integer count = IteratorUtils.toList(input.iterator()).size();
out.collect(new Tuple3<>(id,windowEnd,count));
}
});
resultStream.print();
env.execute();
}
}

- trigger :
.trigger()Definition window When to close , Trigger the calculation and output the result
- Remover :
.evitor()Define the logic for removing some data
.allowedLateness()Allow processing of late data
.sideOutputLateData()Put the late data into the side output stream
.getSideOutput()Get side output stream
public class WindowTest2_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
// Open counting window test
DataStream<Double> resultStream = dataStream.keyBy("id")
.countWindow(10, 2)
.aggregate(new MyAvgTemp());
// Other options API Processing method of late data
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
//.trigger()
//.evictor()
.allowedLateness(Time.minutes(1))
// Output to flow measurement
.sideOutputLateData(outputTag)
.sum("temperature");
sumStream.getSideOutput(outputTag).print("late");
resultStream.print();
env.execute();
}
public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
@Override
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
}
}
}

- stay flink In, we define a window, Must be in keyBy After the operation .
- After the window function, there must be aggregation operation .
边栏推荐
- 你的 Sleep 服务会梦到服务网格外的 bookinfo 吗
- [to be continued] [UE4 notes] l5ue4 model import
- uni-app中使用computed解决了tab切换中data()值显示的异常
- Systemserver process
- Data Lake (11): Iceberg table data organization and query
- 【文档树、设置】字体变小
- MySQL 45 lecture - learning the actual battle of MySQL in Geek time 45 Lecture Notes - 05 | easy to understand index (Part 2)
- Téléchargement par navigateur
- [usaco05jan]watchcow s (Euler loop)
- Sum of the first n terms of Fibonacci (fast power of matrix)
猜你喜欢

默认插槽,具名插槽,作用域插槽

Development skills of rxjs observable custom operator

SystemServer进程
![[Blue Bridge Cup] children's worship circle](/img/ad/5af4fe76ad5d1426bc460904d7f049.jpg)
[Blue Bridge Cup] children's worship circle

Halcon extract orange (Orange)

Who is better, Qianyuan projection Xiaoming Q1 pro or Jimi new play? Which configuration is higher than haqu K1?

Chaos engineering platform chaosblade box new heavy release

BeanUtils--浅拷贝--实例/原理

大家信夫一站式信用平台让信用场景“用起来

MySQL45讲——学习极客时间MySQL实战45讲笔记—— 05 | 深入浅出索引(下)
随机推荐
Code implementation MNLM
Qt入门-制作一个简易的计算器
Launcher startup process
P1042 [NOIP2003 普及组] 乒乓球
Mysql5.7 installation super easy tutorial
uniapp小程序 subPackages分包配置
BeanUtils--浅拷贝--实例/原理
Halcon extract orange (Orange)
docker mysql
Golang 快速生成数据库表的 model 和 queryset
Solving the longest subsequence with linear DP -- three questions
Just 1000 fans, record it
Stone merging Board [interval DP] (ordinary stone Merging & Ring Stone merging)
无主灯设计:如何让智能照明更加「智能」?
快解析:轻松实现共享上网
瀏覽器驅動的下載
mysql ---- Oracle中的rownum转换成MySQL
浏览器驱动的下载
Data consistency between redis and database
Origin plots thermogravimetric TG and differential thermogravimetric DTG curves
