当前位置:网站首页>Flink window mechanism (two waits and the last explanation)
Flink window mechanism (two waits and the last explanation)
2022-06-28 05:35:00 【Jinghu soliton】
1. Source code
package demo.flink.test;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class TestWindow {
/** * * Window mechanism ( Two waits , Finally, find out the bottom ) * 1. WaterMark Waterline . The window can set a short waiting time , When the following data arrives , Then close the window * 2. allowLateness Delay window closing time . * Set a delay time after the window closes , Data arriving within the delay time , Window aggregation will be performed again during subsequent window calculation . * 3. sideOutputStream Side output stream This is the final plan . After the window completes the aggregation calculation , No longer receive data . * These long overdue data , The user can only choose to collect another side output stream , Decide for yourself what to do with . * * @throws Exception */
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Be careful : Do not use fromElements, Can't test the effect of the window
// DataStreamSource<Integer> source = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 1);
/** * 1. Set the following scenario * 1) watermark Delay 1 second * 2) allowLateness Delay 3 second * 3) sideOutputStream The bottom line * * 2. Try the following parameters * 0, * 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 // window 0 data [1 ~ 11) * * 11 // window 1 data [11 ~ 21), window 0 close * 1, 2, 3 // window 0 Delay data (1 second ) * * 12 // window 1 data [11 ~ 21) * 1, 2, 3 // window 0 Delay data (2 second ) * * 13 // window 1 data [11 ~ 21) * 1, 2, 3 // window 0 Delay data (3 second ) * * 14 // window 1 data [11 ~ 21) * 1, 2, 3 // window 0 Delay data ( exceed 3 second ), go sideOutputStream The underlying logic * * 20 // window 1 data [11 ~ 21) * 21 // window 2 data [21, 31), window 1 close * 22, 23, 24, 25, 26, 27, 28, 29, 30 // window 2 data [21, 31) * 31 // window 3 Turn on * */
SingleOutputStreamOperator<String> source = env.socketTextStream("localhost", 7777);
OutputTag<Tuple1<Integer>> sideTag = new OutputTag<Tuple1<Integer>>("side"){
};
SingleOutputStreamOperator<Tuple1<Integer>> stream = source
.map(Integer::parseInt)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // watermark Delay 1 second
.withTimestampAssigner((SerializableTimestampAssigner<Integer>) (element, recordTimestamp) -> element * 1000) // event time Set up
)
.map((MapFunction<Integer, Tuple1<Integer>>) value -> new Tuple1<Integer>(1))
.returns(new TypeHint<Tuple1<Integer>>() {
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))) // Scroll the window 10 second
.allowedLateness(Time.seconds(3)) // allowLateness Delay 3 second
.sideOutputLateData(sideTag) // sideOutputStream The bottom line
.sum(0);
source.print("source-----------------");
stream.print("stream-----------------");
stream.getSideOutput(sideTag).print("side-----------------");
env.execute("demo");
}
}
2. Console output
nc -lk 7777 ## monitor 7777 port
## Console output
source-----------------> 0
source-----------------> 1
source-----------------> 2
source-----------------> 3
source-----------------> 4
source-----------------> 5
source-----------------> 6
source-----------------> 7
source-----------------> 8
source-----------------> 9
source-----------------> 10
source-----------------> 11
stream-----------------> (10)
source-----------------> 1
stream-----------------> (11)
source-----------------> 2
stream-----------------> (12)
source-----------------> 3
stream-----------------> (13)
source-----------------> 12
source-----------------> 1
stream-----------------> (14)
source-----------------> 2
stream-----------------> (15)
source-----------------> 3
stream-----------------> (16)
source-----------------> 13
source-----------------> 1
stream-----------------> (17)
source-----------------> 2
stream-----------------> (18)
source-----------------> 3
stream-----------------> (19)
source-----------------> 14
source-----------------> 1
side-----------------> (1)
source-----------------> 2
side-----------------> (1)
source-----------------> 3
side-----------------> (1)
source-----------------> 20
source-----------------> 21
stream-----------------> (5)
source-----------------> 22
source-----------------> 23
source-----------------> 24
source-----------------> 25
source-----------------> 26
source-----------------> 27
source-----------------> 28
source-----------------> 29
source-----------------> 30
source-----------------> 31
stream-----------------> (10)
3. Reference resources
边栏推荐
- Online yaml to JSON tool
- CSCI GA scheduling design
- Reactive dye research: lumiprobe af594 NHS ester, 5-isomer
- Pcr/qpcr research: lumiprobe dsgreen is used for real-time PCR
- mysql 导出查询结果成 excel 文件
- RL 实践(0)—— 及第平台辛丑年冬赛季【Rule-based policy】
- ? How to write the position to output true
- Codeworks 5 questions per day (1700 for each)
- 独立站卖家都在用的五大电子邮件营销技巧,你知道吗?
- JSP connects with Oracle to realize login and registration (simple)
猜你喜欢

MySQL 45 talk | 05 explain the index in simple terms (Part 2)

Function reentry caused by Keil C51's data overlaying mechanism

Share a powerful tool for factor Mining: genetic programming

【C语言练习——打印空心正方形及其变形】

Line animation

Carboxylic acid study: lumiprobe sulfoacyanine 7 dicarboxylic acid

JSP connects with Oracle to realize login and registration (simple)
![[JVM] - Division de la mémoire en JVM](/img/d8/29a5dc0ff61e35d73f48effb858770.png)
[JVM] - Division de la mémoire en JVM

Oracle 条件、循环语句

什么是WebRTC?
随机推荐
Extjs library management system source code intelligent library management system source code
博客登录框
线条动画
【LeetCode】12、整数转罗马数字
Video tutorial on website operation to achieve SEO operation [21 lectures]
数据中台:一篇带你深入浅出了解数据中台
jsp连接oracle实现登录注册(简单)
[leetcode] 12. Integer to Roman numeral
How long will the PMP test results come out? You must know this!
[C language practice - printing hollow square and its deformation]
Docker installs mysql5.7 and starts binlog
数据中台:数据治理的建设思路以及落地经验
Office is being updated and the application cannot start normally
To batch add background pictures and color changing effects to videos
Store inventory management system source code
mysql 导出查询结果成 excel 文件
What is the difference between AC and DC?
Error: the following arguments are required:
Blog login box
中小型水库大坝安全自动监测系统解决方案