当前位置:网站首页>The most complete analysis of Flink frame window function
The most complete analysis of Flink frame window function
2022-07-02 14:12:00 【InfoQ】
summary

Window type
Time window (Time Window)
Count window (Count Window)
- Take the time window as an example ( The counting window is similar to ), Scrolling window is to segment data at fixed time intervals .
- The characteristic is that the time is relatively aligned 、 The length of windows is fixed and there is no overlap .

- Take the time window as an example ( The counting window is similar to ), Sliding window is another form of fixed window , Sliding window consists of fixed window length and sliding interval .
- The window length is fixed , Windows can overlap .

- Session exposure only exists in the time window , Count window no session window .
- It is characterized by no alignment of time

Window API Use
.window()
window ()
keyBy
window()
WindowAssigner
WindowAssigner
window
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Windowing test Specify the window allocator
DataStream<Integer> resultStream = dataStream.keyBy("id")
// Set up a 15 A scrolling window in seconds
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
// Session window
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
// The sliding window
//.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
env.execute();
}
}
.timeWindow
.countWindow
Time.milliseconds(x)
Time.seconds(x)
Time.minutes(x)
.timeWindow(Time.seconds(15))
.timeWindow(Time.seconds(15),Time.seconds(5))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
.countWindow(10)
.countWindow(10,2)
- Incremental aggregate function : Each piece of data comes and is calculated , Keep a state first , Aggregate functions have
ReduceFunction
AggregateFunction
.
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Windowing test Specify the window allocator
DataStream<Integer> resultStream = dataStream.keyBy("id")
// Aggregate the window Incremental window operation
.timeWindow(Time.seconds(15))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
@Override
// Create accumulators
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(SensorReading sensorReading, Integer accumulator) {
return accumulator+1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
});
resultStream.print();
env.execute();
}
}

- Full window functions : First collect all the data in the window , When it's time to compute, it's going to traverse all the data . Corresponding function :
ProcessWindowFunction
,WindowFunction

public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
// Build a stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Full window function
DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
String id =tuple.getField(0);
Long windowEnd =window.getEnd();
Integer count = IteratorUtils.toList(input.iterator()).size();
out.collect(new Tuple3<>(id,windowEnd,count));
}
});
resultStream.print();
env.execute();
}
}

- trigger :
.trigger()
Definition window When to close , Trigger the calculation and output the result
- Remover :
.evitor()
Define the logic for removing some data
.allowedLateness()
Allow processing of late data
.sideOutputLateData()
Put the late data into the side output stream
.getSideOutput()
Get side output stream
public class WindowTest2_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
// Open counting window test
DataStream<Double> resultStream = dataStream.keyBy("id")
.countWindow(10, 2)
.aggregate(new MyAvgTemp());
// Other options API Processing method of late data
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
//.trigger()
//.evictor()
.allowedLateness(Time.minutes(1))
// Output to flow measurement
.sideOutputLateData(outputTag)
.sum("temperature");
sumStream.getSideOutput(outputTag).print("late");
resultStream.print();
env.execute();
}
public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
@Override
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
}
}
}

- stay flink In, we define a window, Must be in keyBy After the operation .
- After the window function, there must be aggregation operation .
边栏推荐
- [development environment] Dell computer system reinstallation (download Dell OS recovery tool | use Dell OS recovery tool to make USB flash disk system | install system)
- uniapp小程序 subPackages分包配置
- Essential elements of science fiction 3D scenes - City
- [template] longest common subsequence ([DP or greedy] board)
- 自定义事件,全局事件总线,消息订阅与发布,$nextTick
- Systemserver process
- Data Lake (11): Iceberg table data organization and query
- Characteristics of selenium
- SystemServer进程
- Golang quickly generates model and queryset of database tables
猜你喜欢
QT new project
MySQL45讲——学习极客时间MySQL实战45讲笔记—— 05 | 深入浅出索引(下)
默认插槽,具名插槽,作用域插槽
Selenium, element operation and browser operation methods
Will your sleep service dream of the extra bookinfo on the service network
Origin绘制热重TG和微分热重DTG曲线
In 2021, the global styrene butadiene styrene (SBS) revenue was about $3722.7 million, and it is expected to reach $5679.6 million in 2028
千元投影小明Q1 Pro和极米NEW Play谁更好?和哈趣K1比哪款配置更高?
你知道Oracle的数据文件大小有上限么?
Qt新建项目
随机推荐
大家信夫一站式信用平台让信用场景“用起来
Use bloc to build a page instance of shutter
Golang 快速生成数据库表的 model 和 queryset
Launcher启动过程
Characteristics of selenium
Use of UIC in QT
Qt-制作一个简单的计算器-实现四则运算-将结果以对话框的形式弹出来
Everyone believes that the one-stop credit platform makes the credit scenario "useful"
selenium 在pycharm中安装selenium
[Hongke technology sharing] how to test DNS server: DNS performance and response time test
P3008 [USACO11JAN]Roads and Planes G (SPFA + SLF优化)
Federated Search: all requirements in search
线性dp求解 最长子序列 —— 小题三则
万物生长大会在杭召开,当贝入选2022中国未来独角兽TOP100榜单
Thymeleaf dependency
Data consistency between redis and database
每日学习3
Analysis of CPU surge in production environment service
Quarkus学习四 - 项目开发到部署
docker mysql