当前位置:网站首页>The most complete analysis of Flink frame window function
The most complete analysis of Flink frame window function
2022-07-02 14:12:00 【InfoQ】
summary

Window type
Time window (Time Window) Count window (Count Window)- Take the time window as an example ( The counting window is similar to ), Scrolling window is to segment data at fixed time intervals .
- The characteristic is that the time is relatively aligned 、 The length of windows is fixed and there is no overlap .

- Take the time window as an example ( The counting window is similar to ), Sliding window is another form of fixed window , Sliding window consists of fixed window length and sliding interval .
- The window length is fixed , Windows can overlap .

- Session exposure only exists in the time window , Count window no session window .
- It is characterized by no alignment of time

Window API Use
.window()window ()keyBywindow()WindowAssignerWindowAssignerwindowpublic class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Windowing test Specify the window allocator
DataStream<Integer> resultStream = dataStream.keyBy("id")
// Set up a 15 A scrolling window in seconds
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
// Session window
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
// The sliding window
//.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
env.execute();
}
}
.timeWindow.countWindowTime.milliseconds(x)Time.seconds(x)Time.minutes(x).timeWindow(Time.seconds(15))
.timeWindow(Time.seconds(15),Time.seconds(5))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
.countWindow(10)
.countWindow(10,2)
- Incremental aggregate function : Each piece of data comes and is calculated , Keep a state first , Aggregate functions have
ReduceFunctionAggregateFunction.
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Windowing test Specify the window allocator
DataStream<Integer> resultStream = dataStream.keyBy("id")
// Aggregate the window Incremental window operation
.timeWindow(Time.seconds(15))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
@Override
// Create accumulators
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(SensorReading sensorReading, Integer accumulator) {
return accumulator+1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
});
resultStream.print();
env.execute();
}
}

- Full window functions : First collect all the data in the window , When it's time to compute, it's going to traverse all the data . Corresponding function :
ProcessWindowFunction,WindowFunction

public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Full window function
DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
String id =tuple.getField(0);
Long windowEnd =window.getEnd();
Integer count = IteratorUtils.toList(input.iterator()).size();
out.collect(new Tuple3<>(id,windowEnd,count));
}
});
resultStream.print();
env.execute();
}
}

- trigger :
.trigger()Definition window When to close , Trigger the calculation and output the result
- Remover :
.evitor()Define the logic for removing some data
.allowedLateness()Allow processing of late data
.sideOutputLateData()Put the late data into the side output stream
.getSideOutput()Get side output stream
public class WindowTest2_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
// Open counting window test
DataStream<Double> resultStream = dataStream.keyBy("id")
.countWindow(10, 2)
.aggregate(new MyAvgTemp());
// Other options API Processing method of late data
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
//.trigger()
//.evictor()
.allowedLateness(Time.minutes(1))
// Output to flow measurement
.sideOutputLateData(outputTag)
.sum("temperature");
sumStream.getSideOutput(outputTag).print("late");
resultStream.print();
env.execute();
}
public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
@Override
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
}
}
}

- stay flink In, we define a window, Must be in keyBy After the operation .
- After the window function, there must be aggregation operation .
边栏推荐
猜你喜欢

Launcher启动过程

Pointer from entry to advanced (1)

你知道Oracle的数据文件大小有上限么?

selenium 在pycharm中安装selenium

万物生长大会在杭召开,当贝入选2022中国未来独角兽TOP100榜单

QT new project
![[development environment] Dell computer system reinstallation (download Dell OS recovery tool | use Dell OS recovery tool to make USB flash disk system | install system)](/img/e0/e9cb42f241a60995d4598ba6a1a2fe.jpg)
[development environment] Dell computer system reinstallation (download Dell OS recovery tool | use Dell OS recovery tool to make USB flash disk system | install system)
![[Blue Bridge Cup] children's worship circle](/img/ad/5af4fe76ad5d1426bc460904d7f049.jpg)
[Blue Bridge Cup] children's worship circle

Pycharm连接远程服务器

In 2021, the global TCB adapter revenue was about $93 million, and it is expected to reach $315.5 million in 2028
随机推荐
故事點 vs. 人天
uni-app中使用computed解决了tab切换中data()值显示的异常
当贝投影4K激光投影X3 Pro获得一致好评:万元投影仪首选
使用BLoC 构建 Flutter的页面实例
[usaco05jan]watchcow s (Euler loop)
Some interview suggestions for Android programmers "suggestions collection"
石子合并板子【区间DP】(普通石子合并 & 环形石子合并)
默认插槽,具名插槽,作用域插槽
给Android程序员的一些面试建议「建议收藏」
软件测试的方法
ArrayList and LinkedList
万物生长大会在杭召开,当贝入选2022中国未来独角兽TOP100榜单
Codeforces Round #803 (Div. 2)(A~D)
Golang quickly generates model and queryset of database tables
Thymeleaf dependency
线性dp求解 最长子序列 —— 小题三则
Data Lake (11): Iceberg table data organization and query
Code implementation MNLM
快解析:轻松实现共享上网
MySQL 45 lecture - learning from the actual battle of geek time MySQL 45 Lecture Notes - 04 | easy to understand index (Part 1)
