当前位置:网站首页>Flink 窗口机制 (两次等待, 最后兜底)
Flink 窗口机制 (两次等待, 最后兜底)
2022-06-28 05:28:00 【静湖孤子】
1. 源码
package demo.flink.test;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class TestWindow {
/** * * 窗口机制 (两次等待, 最后兜底) * 1. WaterMark 水位线。窗口可以设置一个短暂的等待时间,等后面的数据到了,再关闭窗口 * 2. allowLateness 延迟窗口关闭时间。 * 在窗口关闭后设置一个延迟时间,延迟时间内到达的数据,会在后续窗口计算过程中重新进行一次窗口聚合。 * 3. sideOutputStream 侧输出流 这是最后的兜底方案。窗口完成聚合计算后,就不再接收数据了。 * 这些长期迟到的数据,用户只能选择另外收集一个侧输出流中,自己决定该要如何处理。 * * @throws Exception */
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 注意: 不要用fromElements, 测试不出窗口的效果
// DataStreamSource<Integer> source = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 1);
/** * 1. 设定如下场景 * 1) watermark 延迟1秒 * 2) allowLateness 延迟3秒 * 3) sideOutputStream 兜底 * * 2. 尝试如下参数 * 0, * 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 // 窗口0 数据 [1 ~ 11) * * 11 // 窗口1 数据 [11 ~ 21), 窗口0 关闭 * 1, 2, 3 // 窗口0 延迟数据(1秒) * * 12 // 窗口1 数据 [11 ~ 21) * 1, 2, 3 // 窗口0 延迟数据(2秒) * * 13 // 窗口1 数据 [11 ~ 21) * 1, 2, 3 // 窗口0 延迟数据(3秒) * * 14 // 窗口1 数据 [11 ~ 21) * 1, 2, 3 // 窗口0 延迟数据(超过3秒), 走sideOutputStream兜底逻辑 * * 20 // 窗口1 数据 [11 ~ 21) * 21 // 窗口2 数据 [21, 31), 窗口1 关闭 * 22, 23, 24, 25, 26, 27, 28, 29, 30 // 窗口2 数据 [21, 31) * 31 // 窗口3 开启 * */
SingleOutputStreamOperator<String> source = env.socketTextStream("localhost", 7777);
OutputTag<Tuple1<Integer>> sideTag = new OutputTag<Tuple1<Integer>>("side"){
};
SingleOutputStreamOperator<Tuple1<Integer>> stream = source
.map(Integer::parseInt)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // watermark 延迟1秒
.withTimestampAssigner((SerializableTimestampAssigner<Integer>) (element, recordTimestamp) -> element * 1000) // event time 设定
)
.map((MapFunction<Integer, Tuple1<Integer>>) value -> new Tuple1<Integer>(1))
.returns(new TypeHint<Tuple1<Integer>>() {
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))) // 滚动窗口 10秒
.allowedLateness(Time.seconds(3)) // allowLateness 延迟3秒
.sideOutputLateData(sideTag) // sideOutputStream 兜底
.sum(0);
source.print("source-----------------");
stream.print("stream-----------------");
stream.getSideOutput(sideTag).print("side-----------------");
env.execute("demo");
}
}
2. 控制台输出
nc -lk 7777 ## 监控7777端口
## 控制台输出
source-----------------> 0
source-----------------> 1
source-----------------> 2
source-----------------> 3
source-----------------> 4
source-----------------> 5
source-----------------> 6
source-----------------> 7
source-----------------> 8
source-----------------> 9
source-----------------> 10
source-----------------> 11
stream-----------------> (10)
source-----------------> 1
stream-----------------> (11)
source-----------------> 2
stream-----------------> (12)
source-----------------> 3
stream-----------------> (13)
source-----------------> 12
source-----------------> 1
stream-----------------> (14)
source-----------------> 2
stream-----------------> (15)
source-----------------> 3
stream-----------------> (16)
source-----------------> 13
source-----------------> 1
stream-----------------> (17)
source-----------------> 2
stream-----------------> (18)
source-----------------> 3
stream-----------------> (19)
source-----------------> 14
source-----------------> 1
side-----------------> (1)
source-----------------> 2
side-----------------> (1)
source-----------------> 3
side-----------------> (1)
source-----------------> 20
source-----------------> 21
stream-----------------> (5)
source-----------------> 22
source-----------------> 23
source-----------------> 24
source-----------------> 25
source-----------------> 26
source-----------------> 27
source-----------------> 28
source-----------------> 29
source-----------------> 30
source-----------------> 31
stream-----------------> (10)
3. 参考
边栏推荐
- jsp连接Oracle实现登录注册
- Detailed usage configuration of the shutter textbutton, overview of the shutter buttonstyle style and Practice
- Oracle 条件、循环语句
- Zzuli:1072 frog climbing well
- The heading angle of sliceplane is the same as that of math Corresponding transformation relation of atan2 (y, x)
- Object detection with OpenCV
- Blog login box
- Rxswift -- (1) create a project
- CpG solid support research: lumiprobe general CpG type II
- Voltage mode and current mode control of switching power supply
猜你喜欢

Binder面试之:内存管理单元

【JVM】——JVM中内存划分

How to do a good job of gateway high availability protection in the big promotion scenario

JSP connects with Oracle to realize login and registration (simple)

Jdbc的使用
![[untitled] drv8825 stepping motor drive board schematic diagram](/img/30/02f695592f3b624ebbb2b7a9f68052.png)
[untitled] drv8825 stepping motor drive board schematic diagram

Create NFS based storageclass on kubernetes

Why is point shield cloud forced to quit playing?

Lhasa accordion

北斗三号短报文终端在大坝安全监测方案的应用
随机推荐
Zzuli:1072 frog climbing well
刘海屏手机在部分页面通过[[UIApplication sharedApplication] delegate].window.safeAreaInsets.bottom得到底部安全区高度为0问题
How does guotaijun charge for safe varieties? Let's talk about the futures account opening process
Docker安装Mysql5.7并开启binlog
Dart learning - functions, classes
What is the difference between AC and DC?
博客登录框
The heading angle of sliceplane is the same as that of math Corresponding transformation relation of atan2 (y, x)
Detailed usage configuration of the shutter textbutton, overview of the shutter buttonstyle style and Practice
MySQL 45 talk | 05 explain the index in simple terms (Part 2)
Metartc5.0 API programming guide (I)
Binder面试之:内存管理单元
Gee learning notes 3- export table data
How high is the gold content of grade II cost engineer certificate? Just look at this
JSP connecting Oracle to realize login and registration
Oracle 条件、循环语句
SlicePlane的Heading角度与Math.atan2(y,x)的对应转换关系
[untitled] drv8825 stepping motor drive board schematic diagram
小球弹弹乐
Lhasa accordion