当前位置:网站首页>Flink 窗口机制 (两次等待, 最后兜底)
Flink 窗口机制 (两次等待, 最后兜底)
2022-06-28 05:28:00 【静湖孤子】
1. 源码
package demo.flink.test;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class TestWindow {
/** * * 窗口机制 (两次等待, 最后兜底) * 1. WaterMark 水位线。窗口可以设置一个短暂的等待时间,等后面的数据到了,再关闭窗口 * 2. allowLateness 延迟窗口关闭时间。 * 在窗口关闭后设置一个延迟时间,延迟时间内到达的数据,会在后续窗口计算过程中重新进行一次窗口聚合。 * 3. sideOutputStream 侧输出流 这是最后的兜底方案。窗口完成聚合计算后,就不再接收数据了。 * 这些长期迟到的数据,用户只能选择另外收集一个侧输出流中,自己决定该要如何处理。 * * @throws Exception */
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 注意: 不要用fromElements, 测试不出窗口的效果
// DataStreamSource<Integer> source = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 1);
/** * 1. 设定如下场景 * 1) watermark 延迟1秒 * 2) allowLateness 延迟3秒 * 3) sideOutputStream 兜底 * * 2. 尝试如下参数 * 0, * 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 // 窗口0 数据 [1 ~ 11) * * 11 // 窗口1 数据 [11 ~ 21), 窗口0 关闭 * 1, 2, 3 // 窗口0 延迟数据(1秒) * * 12 // 窗口1 数据 [11 ~ 21) * 1, 2, 3 // 窗口0 延迟数据(2秒) * * 13 // 窗口1 数据 [11 ~ 21) * 1, 2, 3 // 窗口0 延迟数据(3秒) * * 14 // 窗口1 数据 [11 ~ 21) * 1, 2, 3 // 窗口0 延迟数据(超过3秒), 走sideOutputStream兜底逻辑 * * 20 // 窗口1 数据 [11 ~ 21) * 21 // 窗口2 数据 [21, 31), 窗口1 关闭 * 22, 23, 24, 25, 26, 27, 28, 29, 30 // 窗口2 数据 [21, 31) * 31 // 窗口3 开启 * */
SingleOutputStreamOperator<String> source = env.socketTextStream("localhost", 7777);
OutputTag<Tuple1<Integer>> sideTag = new OutputTag<Tuple1<Integer>>("side"){
};
SingleOutputStreamOperator<Tuple1<Integer>> stream = source
.map(Integer::parseInt)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // watermark 延迟1秒
.withTimestampAssigner((SerializableTimestampAssigner<Integer>) (element, recordTimestamp) -> element * 1000) // event time 设定
)
.map((MapFunction<Integer, Tuple1<Integer>>) value -> new Tuple1<Integer>(1))
.returns(new TypeHint<Tuple1<Integer>>() {
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))) // 滚动窗口 10秒
.allowedLateness(Time.seconds(3)) // allowLateness 延迟3秒
.sideOutputLateData(sideTag) // sideOutputStream 兜底
.sum(0);
source.print("source-----------------");
stream.print("stream-----------------");
stream.getSideOutput(sideTag).print("side-----------------");
env.execute("demo");
}
}
2. 控制台输出
nc -lk 7777 ## 监控7777端口
## 控制台输出
source-----------------> 0
source-----------------> 1
source-----------------> 2
source-----------------> 3
source-----------------> 4
source-----------------> 5
source-----------------> 6
source-----------------> 7
source-----------------> 8
source-----------------> 9
source-----------------> 10
source-----------------> 11
stream-----------------> (10)
source-----------------> 1
stream-----------------> (11)
source-----------------> 2
stream-----------------> (12)
source-----------------> 3
stream-----------------> (13)
source-----------------> 12
source-----------------> 1
stream-----------------> (14)
source-----------------> 2
stream-----------------> (15)
source-----------------> 3
stream-----------------> (16)
source-----------------> 13
source-----------------> 1
stream-----------------> (17)
source-----------------> 2
stream-----------------> (18)
source-----------------> 3
stream-----------------> (19)
source-----------------> 14
source-----------------> 1
side-----------------> (1)
source-----------------> 2
side-----------------> (1)
source-----------------> 3
side-----------------> (1)
source-----------------> 20
source-----------------> 21
stream-----------------> (5)
source-----------------> 22
source-----------------> 23
source-----------------> 24
source-----------------> 25
source-----------------> 26
source-----------------> 27
source-----------------> 28
source-----------------> 29
source-----------------> 30
source-----------------> 31
stream-----------------> (10)
3. 参考
边栏推荐
- How long will the PMP test results come out? You must know this!
- Gorm transaction experience
- Zzuli:1072 frog climbing well
- 【JVM】——JVM中內存劃分
- Gee learning notes 3- export table data
- [Linux] - using xshell to install MySQL on Linux and realize the deployment of webapp
- Oracle 常用基础函数
- Application of Beidou No.3 short message terminal in dam safety monitoring scheme
- 数据中台:一篇带你深入浅出了解数据中台
- Docker installs mysql5.7 and starts binlog
猜你喜欢

拉萨手风琴

Sharing | intelligent environmental protection - ecological civilization informatization solution (PDF attached)

How to learn programmable logic controller (PLC)?

JSP
![[untitled] drv8825 stepping motor drive board schematic diagram](/img/30/02f695592f3b624ebbb2b7a9f68052.png)
[untitled] drv8825 stepping motor drive board schematic diagram

解决ValueError: Iterable over raw text documents expected, string object received.

JS中的链表(含leetcode例题)<持续更新~>

Docker installs mysql5.7 and starts binlog

JSP connects with Oracle to realize login and registration (simple)

OpenSSL client programming: SSL session failure caused by an obscure function
随机推荐
CSCI GA scheduling design
jsp连接oracle实现登录注册(简单)
Sqlmap tool user manual
8VC Venture Cup 2017 - Elimination Round D. PolandBall and Polygon
Unity out ref params
jq图片放大器
Online yaml to JSON tool
阴阳师页面
Simple usage of GSAP
What are functions in C language? What is the difference between functions in programming and functions in mathematics? Understanding functions in programming languages
Zzuli:1071 decomposing prime factor
数据中台:AI中台的实施与总结
北斗三号短报文终端在大坝安全监测方案的应用
Leecode question brushing-ii
MySQL export query results to excel file
【Linux】——使用xshell在Linux上安装MySQL及实现Webapp的部署
JS text box loses focus to modify width text and symbols
JSP connects with Oracle to realize login and registration (simple)
Why does the company choose cloud database? What is its charm!
Solution of dam safety automatic monitoring system for medium and small reservoirs