当前位置:网站首页>关于Flink框架窗口(window)函数最全解析
关于Flink框架窗口(window)函数最全解析
2022-07-02 10:34:00 【InfoQ】
概述

窗口类型
时间窗口(Time Window)计数窗口(Count Window)- 以时间窗口为例(计数窗口类似),滚动窗口就是按照固定的时间间隔将数据进行切分。
- 特点就是时间比较对齐、窗口的长度都是固定的且没有重叠。

- 以时间窗口为例(计数窗口类似),滑动窗口是固定窗口的另一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
- 窗口长度是固定的,窗口之间是可以重叠的。

- 会话敞口只存在于时间窗口,计数窗口无会话窗口。
- 特点是时间无对齐

Window API使用
.window()window ()keyBywindow()WindowAssignerWindowAssignerwindowpublic class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//开窗测试 指定窗口分配器
DataStream<Integer> resultStream = dataStream.keyBy("id")
//设置一个15秒的一个滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
//会话窗口
//.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
//滑动窗口
//.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(10)))
env.execute();
}
}
.timeWindow.countWindowTime.milliseconds(x)Time.seconds(x)Time.minutes(x).timeWindow(Time.seconds(15))
.timeWindow(Time.seconds(15),Time.seconds(5))
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(15)))
.countWindow(10)
.countWindow(10,2)
- 增量聚合函数:每条数据到来就进行计算,先保持着一个状态,聚合函数有
ReduceFunctionAggregateFunction。
public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//开窗测试 指定窗口分配器
DataStream<Integer> resultStream = dataStream.keyBy("id")
//对窗口进行聚合操作 增量窗口操作
.timeWindow(Time.seconds(15))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
@Override
//创建累加器
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(SensorReading sensorReading, Integer accumulator) {
return accumulator+1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer integer, Integer acc1) {
return null;
}
});
resultStream.print();
env.execute();
}
}

- 全窗口函数:先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。对应的函数:
ProcessWindowFunction,WindowFunction

public class WindowTest1_TimeWindow {
public static void main(String[] args) throws Exception {
//构建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设施并行度为1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//全量窗口函数
DataStream<Tuple3<String,Long,Integer>> resultStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.apply(new WindowFunction<SensorReading, Tuple3<String,Long,Integer>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String,Long,Integer>> out) throws Exception {
String id =tuple.getField(0);
Long windowEnd =window.getEnd();
Integer count = IteratorUtils.toList(input.iterator()).size();
out.collect(new Tuple3<>(id,windowEnd,count));
}
});
resultStream.print();
env.execute();
}
}

- 触发器:
.trigger()定义 window 什么时候关闭,触发计算并输出结果
- 移除器:
.evitor()定义移除某些数据的逻辑
.allowedLateness()允许处理迟到的数据
.sideOutputLateData()将迟到的数据放入侧输出流
.getSideOutput()获取侧输出流
public class WindowTest2_CountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
// 开计数窗口测试
DataStream<Double> resultStream = dataStream.keyBy("id")
.countWindow(10, 2)
.aggregate(new MyAvgTemp());
//其他可选API 对迟到数据的处理方式
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("iate") {};
SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
//.trigger()
//.evictor()
.allowedLateness(Time.minutes(1))
//输出到测流
.sideOutputLateData(outputTag)
.sum("temperature");
sumStream.getSideOutput(outputTag).print("late");
resultStream.print();
env.execute();
}
public static class MyAvgTemp implements AggregateFunction<SensorReading, Tuple2<Double,Integer>,Double>{
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
@Override
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> accumulator) {
return new Tuple2<>(accumulator.f0+sensorReading.getTemperature(),accumulator.f1+1);
}
@Override
public Double getResult(Tuple2<Double, Integer> accumulator) {
return accumulator.f0 / accumulator.f1;
}
@Override
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> a, Tuple2<Double, Integer> b) {
return new Tuple2<>(a.f0+b.f0,a.f1+b.f1);
}
}
}

- 在 flink中我们定义一个window,必须在 keyBy操作之后。
- 窗口函数之后一定要有聚合操作。
边栏推荐
- selenium 元素定位方法
- MySQL45讲——学习极客时间MySQL实战45讲笔记—— 04 | 深入浅出索引(上)
- How to use SAP's metadata framework (MDF) to build custom business rules?
- 每日学习3
- Qt新项目_MyNotepad++
- Quarkus learning IV - project development to deployment
- Quarkus学习四 - 项目开发到部署
- P1042 [noip2003 popularization group] Table Tennis
- In 2021, the global revenue of structural bolts was about $796.4 million, and it is expected to reach $1097.6 million in 2028
- Route (II)
猜你喜欢

Daily practice of C language --- monkeys divide peaches

当贝投影4K激光投影X3 Pro获得一致好评:万元投影仪首选

Error: eacces: permission denied, access to "/usr/lib/node_modules"
[document tree, setting] font becomes smaller

Codeforces Round #803 (Div. 2)(A~D)

Launcher启动过程

Solution: Compression Technology (original version and sequel version)

Will your sleep service dream of the extra bookinfo on the service network

Codeforces Round #803 (Div. 2)(A~D)

Dangbei projection 4K laser projection X3 Pro received unanimous praise: 10000 yuan projector preferred
随机推荐
Essential elements of science fiction 3D scenes - City
石子合并板子【区间DP】(普通石子合并 & 环形石子合并)
The 29 year old programmer in Shanghai was sentenced to 10 months for "deleting the database and running away" on the day of his resignation!
[to be continued] [UE4 notes] l5ue4 model import
Qt-制作一个简单的计算器-实现四则运算
Which do you choose between Alibaba P7 with an annual salary of 900000 and deputy department level cadres?
[development environment] Dell computer system reinstallation (download Dell OS recovery tool | use Dell OS recovery tool to make USB flash disk system | install system)
浏览器驱动的下载
[development environment] 010 editor tool (tool download | binary file analysis template template installation | shortcut key viewing and setting)
P1042 [noip2003 popularization group] Table Tennis
In 2021, the global revenue of structural bolts was about $796.4 million, and it is expected to reach $1097.6 million in 2028
刚好1000粉丝,记录一下
万物生长大会在杭召开,当贝入选2022中国未来独角兽TOP100榜单
Subcontracting configuration of uniapp applet subpackages
【虹科技术分享】如何测试 DNS 服务器:DNS 性能和响应时间测试
[technology development-22]: rapid overview of the application and development of network and communication technology-2-communication Technology
Halcon extract orange (Orange)
Chaos engineering platform chaosblade box new heavy release
Simple introduction to ENSP
[USACO05JAN]Watchcow S(欧拉回路)
