当前位置:网站首页>Talk about the Flink waterline
Talk about the Flink waterline
2022-06-22 03:33:00 【Dipu Technology】
1、 summary
flink Time and state are more important in , Study flink The understanding of the water level has been vague , After a period of digestion , Here is a summary . This paper mainly discusses what the waterline is , How did it come from , What is there to describe clearly .
2、 A waterline that is not easy to understand
Some people like to call watermarks watermarks , Whether it's a watermark or a watermark , It doesn't fit our life at all , More abstract , It is hard to understand . In our life, the water level is similar to a wall clock hanging on the wall in our home , Like our watches . Let's talk about the following topics :
1, How did it happen .
2, Since it is a wall clock , What are the characteristics of clocks , Clock every 1s The second hand takes a small step forward , Is the time getting bigger , Do these characteristic watermarks also exist .
3, What's the use of a wall clock ? Look at your watch at night and find 12 spot , We must hint at ourselves :“ It's time to go to bed ”, Let us know what we should do at what time through time .
3、 What is the water mark
3.1、 Definition of water mark
The watermark is a logical clock , Why is it called a logical clock ? The normal time is yes cpu Produced , Move forward periodically and fixedly , But the time of our clock is calculated by the programmer , according to " Event time " Dynamically calculated ( As for what is a time event , I won't talk about any usage scenarios here ), For example, the result calculated at a certain time is x,x The value is 2022-10-10 10:10:10 The corresponding timestamp is 1665367810000,x The value of increases with the time of the event , The possible result of x,x+1,x+2,x+3,x+4 … Successive and increasing timestamps are not similar to clocks every 1s Take a step forward .
3.2、 Waterline ( Logical clock ) The composition of
The watermark consists of a series of consecutive timestamps , More and more big , Each timestamp is dynamically calculated based on the event time . A clock is also made up of a continuous time , It's also getting bigger and bigger , Such as 2022-10-10 10:10:10,2022-10-10 10:10:11,2022-10-10 10:10:12,2022-10-10 10:10:13 ... etc. , The water mark is similar to the clock in life , So I call this watermark a logical clock , A logical clock is a watermark , Watermark mechanism .
3.3、 Current time of logical clock
Similar to the current time of the clock , What time is it here , The current time is very important , Window closure , The trigger of a scheduled task is determined according to the current time .
Current value characteristics : More and more big , Insert negative infinity when the stream is just generated , End is to insert a positive infinite value .
Personally, I think the current value is similar to a pointer variable , His direction is constantly changing ( Personal understanding ).
3.4、 The calculation formula of the current time
Of the clock " current time " Corresponding to a specific timestamp . The current value of the clock xxx = Event time - Maximum delay time - 1 millisecond .
3.5、 Let's take a case
Case description : from socket Reading data , And print the specific value of the current water level .
package com.deepexi.sql;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class ExampleTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
// from socket Reading data
.socketTextStream("192.168.117.211", 9999)
.map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1])))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
//5s Delay time
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
// Extract event time
return element.f1;
}
})
)
// shunt
.keyBy(r -> r.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
out.collect(" The current waterline is :" + ctx.timerService().currentWatermark());
}
})
.print();
env.execute();
}
}
nc -lk 9999 Turn on socket service , monitor 9999 port
Command line input :a 1000
[[email protected] ~]# nc -lk 9999 a 1000
idea Console printing
The current waterline is :-9223372036854775808 //-9223372036854775808 Is an infinite number
Command line input :a 2000
idea Console printing :
The current waterline is :-4001 // The value of the current watermark = Event time - Maximum delay time -1 = 1000 - 5000 -1 = -4000
Why 1000- 5000 -1 While using 2000 - 5000 -1? flink Periodically, a water mark is inserted into the flow , The water mark is also an element in the flow , Let's look at the figure below .
Command line input :a 3000
idea Console printing : The current waterline is :-3001 //2000 - 5000 -1 = -2000
Command line input :a 10000
idea Console printing : The current waterline is :-2001 //3000 - 5000 -1 = -2000
Command line input :a 1000
idea Console printing : The current waterline is :4999 //10000 - 5000 -1 = 4999
Command line input :a 1000
idea Console printing : The current waterline is :4999 //10000 - 5000 -1 = 4999
Command line input :a 2000
idea Console printing : The current waterline is :4999 //10000 - 5000 -1 = 4999
Through the print results of the console, it is found that the water level is the same as that of the clock , The value is always getting bigger and bigger , Change with the time of the event , But it doesn't get smaller , It may also stop for a moment , Such as the input a 1000 After that, enter a 1000,a 2000 The value of the watermark is always 4999.
The whole printing process
Command line window :
[root@master ~]# nc -lk 9999
a 1000
a 2000
a 3000
a 10000
a 1000
a 1000
a 2000
idea Print :
The current waterline is :-9223372036854775808
The current waterline is :-4001
The current waterline is :-3001
The current waterline is :-2001
The current waterline is :4999
The current waterline is :4999
The current waterline is :4999

4、 How to produce
A watermark is essentially a timestamp , This timestamp is dynamically calculated by the programmer according to the event time , Let's take a case directly .
Case study 1
Custom watermark generation logic , Realization WatermarkStrategy Interface ,flink Every time 200 Millisecond call onPeriodicEmit Method .
public class ExampleTest2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Set every 1 Insert the water level once every minute
//env.getConfig().setAutoWatermarkInterval(6 * 1000L);
env
.socketTextStream("192.168.117.211", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]));
}
})
.assignTimestampsAndWatermarks(new CustomWatermarkGenerator())
.print();
env.execute();
}
public static class CustomWatermarkGenerator implements WatermarkStrategy<Tuple2<String, Long>> {
@Override
public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
};
}
@Override
public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Tuple2<String, Long>>() {
// Maximum delay time
private Long bound = 5000L;
private Long maxTs = -Long.MAX_VALUE + bound + 1L;
@Override
public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
// Update the maximum observed event time
maxTs = Math.max(maxTs, event.f1);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
System.out.println(" The value of the water mark :" + (maxTs - bound - 1L));
// Send watermark , Calculation formula : Event time - Delay time -1L
output.emitWatermark(new Watermark(maxTs - bound - 1L));
}
};
}
}
}
nc -lk 9999 Turn on socket service , monitor 9999 port
start-up idea, The console is 200 MS print results : The value of the water mark :xxxxx. as follows :
The value of the water mark :-9223372036854775807
The value of the water mark :-9223372036854775807
The value of the water mark :-9223372036854775807
The value of the water mark :-9223372036854775807
Command line input :a 1000
The console is 200 MS print results : The value of the water mark :xxxxx. as follows :
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
Command line input :a 2000
The console is 200 Millisecond print interface : The value of the water mark :xxxxx. as follows :
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
// Default 200 Millisecond inserts the watermark into the stream , You can set the time interval of the inserted flow of the watermark
env.getConfig().setAutoWatermarkInterval(6 * 1000L);
The whole printing process
Command line window :
[root@master ~]# nc -lk 9999
a 1000
a 2000
idea Print :
The value of the water mark :-9223372036854775807
The value of the water mark :-9223372036854775807
The value of the water mark :-9223372036854775807
The value of the water mark :-9223372036854775807
The value of the water mark :-9223372036854775807
(a,1000)
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
(a,2000)
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
Disconnected from the target VM, address: '127.0.0.1:58591', transport: 'socket'
The value of the water mark :-3001
Process finished with exit code 130
From the results we can know , The value of the water mark varies with the time of the event 1000,2000 Change by change . If input a 2000 After that, enter a 1000, What is the result of the console print ? It must have been printed : The value of the water mark :-3001, Because the value of the water mark is the same as time, it will only get bigger and bigger forever .
Case study 2
Change the program , Add the following code ,keyby after , Print out the elements entered on the command line .
nc -lk 9999 start-up socket monitor 9999 port
start-up idea
Command line input
[root@localhost ~]# nc -lk 9999
a 1000
a 2000
a 5000
a 6000
idea Console printing :
The value of the water mark :-9223372036854775807
The value of the water mark :-9223372036854775807
The input business data is :(a,1000)
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
The value of the water mark :-4001
The input business data is :(a,2000)
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The value of the water mark :-3001
The input business data is :(a,5000)
The value of the water mark :-1
The value of the water mark :-1
The value of the water mark :-1
The value of the water mark :-1
The value of the water mark :-1
The value of the water mark :-1
The value of the water mark :-1
The value of the water mark :-1
The input business data is :(a,6000)
The value of the water mark :999
The value of the water mark :999
The value of the water mark :999
The value of the water mark :999
The value of the water mark :999
Analyze the calculation results :
-9223372036854775807,-9223372036854775807,(a,1000),-4001,-4001,-4001,-4001,-4001,-4001,-4001,-4001,(a,2000),-3001,-3001,-3001,-3001,-3001,(a,5000),-1,-1,-1,(a,6000),999,999,999,999
I wonder if you have a feeling , What is the relationship between water level and business data ? Is it similar to the relationship between falling flowers and flowing water in life ? Business data is water in the river , The water line is like a flower falling in the water , They both flow to the sea , Like business data, the watermark belongs to an element in the flow .
5、 What's the usage?
In the world of flow, the logical clock is a reference . Let's take the wall clock as an example , Look at the wall clock 12 O 'clock , We must be hinting that we should put down our cell phones and go to bed . For an endless stream of data , Split the data stream into multiple segments for processing , Make statistics for each segment of data , When will the statistics be triggered ? At this time, the logic clock will be used , Window to see what the logical time is currently , It is found that the window end time is less than the clock time , The window is closed for statistics .
Case study 1, The watermark triggers the execution of the scheduled task
Function description : The current timestamp of the watermark is greater than the trigger time of the scheduled task Trigger the execution of scheduled tasks .
public class ExampleTest3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("192.168.117.211", 9999)
.map(r -> Tuple2.of(r.split(" ")[0], Long.parseLong(r.split(" ")[1])))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
})
)
.keyBy(r -> r.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
// out.collect(" The current waterline is :" + ctx.timerService().currentWatermark());
ctx.timerService().registerEventTimeTimer(value.f1 + 5000L);
out.collect(" A timestamp is registered :" + new Timestamp(value.f1 + 5000L) + " Timer for ");
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
out.collect(" Timer triggered !");
}
})
.print();
env.execute();
}
}
nc -lk 9999 Turn on socket service , monitor 9999 port
Command line input :a 1665367810000 //1665367810000 The corresponding time is 2022-10-10 10:10:10
Console output : A timestamp is registered :2022-10-10 10:10:15.0 Timer for //2022-10-10 10:10:15 Convert to a timestamp of 1665367815000
Explain the console output
The value of the current watermark :2022-10-10 10:10:10 - 5s -1 millisecond = 1665367810000 - 5000 -1 = 1665367804999. When the value of the water mark is greater than 1665367815000 Timed task trigger .
Command line input :1665367821000 // Command line input 2022-10-10 10:10:21 The corresponding time stamp 1665367821000 The scheduled task will be triggered
Console output : Timer triggered !
Named line print input
[root@master ~]# nc -lk 9999
a 1665367810000
a 1665367821000
idea Print input
A timestamp is registered :2022-10-10 10:10:15.0 Timer for
A timestamp is registered :2022-10-10 10:10:26.0 Timer for
Timer triggered !
Case study 2, The current timestamp of the water mark is greater than the end time of the window to trigger the closing of the window
Case study day3.Example4
public class ExampleTest4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("192.168.117.211", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]));
}
})
.assignTimestampsAndWatermarks(
// The maximum delay time is set to 5 second
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1; // tell flink Which field is the event time
}
})
)
.keyBy(r -> r.f0)
// 5 Second event time scrolling window
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
// System.out.println(" The end value of the current window :" + context.currentWatermark());
// System.out.println(" The value of the current watermark :" + context.currentWatermark());
long count = elements.spliterator().getExactSizeIfKnown();
out.collect(" user " + key + " At the window " +
"" + new Timestamp(windowStart) + "~" + new Timestamp(windowEnd) + "" +
" Medium pv Times is :" + count);
}
})
.print();
env.execute();
}
}
Command line input :a 1665367810000 //flink Will open a 2022-10-10 10:10:10.0~2022-10-10 10:10:15 The window of , When the current value of the water level line ( The current value refers to the current time above ) A timestamp greater than the window end time triggers the window to close .
Command line input :a 1665367821000 // At this time, the current value of the water mark is :1665367821000 - 5000 -1 = 1665367815999,1665367815999 Turn it into time :2022-10-10 10:10:15,2022-10-10 10:10:15 Equal to the window end time , So trigger the window to close .
Control output : user a At the window 2022-10-10 10:10:10.0~2022-10-10 10:10:15.0 Medium pv Times is :1
Command line
[root@master ~]# nc -lk 9999
a 1665367810000
a 1665367821000
idea
The end value of the current window :1665367815999
The value of the current watermark :1665367815999
user a At the window 2022-10-10 10:10:10.0~2022-10-10 10:10:15.0 Medium pv Times is :1
If according to " The processing time " For statistical analysis , The window should be closed for statistics , There must be a reference time , Just this time is cpu Help produce , The window is closed according to cpu The generated time is closed , But the value of a certain moment of the logical clock is calculated by the program , This is why the water mark is called a logical clock .
6、 Late data processing
6.1、 What is late data
The event time is less than the current timestamp of the watermark , For example, the data of the current data stream xxx The event time carried is 2022:20:50, The time at this time of the logical clock is 2022:20:51, that flink Think xxx Just a piece of late data .
Case description : Send watermark manually , Manually send the element that carries the event time .
public class ExampleTest5 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<String> result = env
.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
// Send data to carry the data of event time hello world
ctx.collectWithTimestamp("hello world", 1000L);
// Send watermark
ctx.emitWatermark(new Watermark(999L));
// Send data to carry the data of event time hello flink
ctx.collectWithTimestamp("hello flink", 2000L);
// Send watermark
ctx.emitWatermark(new Watermark(1999L));
// Send data to carry the data of event time hello late
ctx.collectWithTimestamp("hello late", 1000L);
}
@Override
public void cancel() {
}
})
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
//System.out.println(" The current water level :" + ctx.timerService().currentWatermark());
// Determine whether the event time is less than the water mark
if (ctx.timestamp() < ctx.timerService().currentWatermark()) {
System.out.println(" Late elements :" + value);
} else {
System.out.println(" Normal element :" + value);
}
}
});
env.execute();
}
}
Console output :
Normal element :hello world
Normal element :hello flink
Late elements :hello late
6.2、 Handling of late elements
Understand what the late element is , As for how to deal with ,flink Several solutions are provided , Such as
Case study : Late data sent to " Side output stream " in
public class ExampleTest {
// Define side output stream
private static OutputTag<String> lateElement = new OutputTag<String>("late-element") {
};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<String> result = env
.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
// Send data to carry the data of event time hello world
ctx.collectWithTimestamp("hello world", 1000L);
// Send watermark
ctx.emitWatermark(new Watermark(999L));
// Send data to carry the data of event time hello flink
ctx.collectWithTimestamp("hello flink", 2000L);
// Send watermark
ctx.emitWatermark(new Watermark(1999L));
// Send data to carry the data of event time hello late
ctx.collectWithTimestamp("hello late", 1000L);
}
@Override
public void cancel() {
}
})
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
// Determine whether the event time is less than the water mark
if (ctx.timestamp() < ctx.timerService().currentWatermark()) {
ctx.output(lateElement, " Late elements are sent to the side output stream :" + value);
} else {
out.collect(" Elements that arrive normally :" + value);
}
}
});
result.print(" Main stream :");
result.getSideOutput(lateElement).print(" Side output stream :");
env.execute();
}
}
idea Console output :
Main stream :> Elements that arrive normally :hello world
Main stream :> Elements that arrive normally :hello flink
Side output stream :> Late elements are sent to the side output stream :hello late
reflection : window , Late elements , What is the correlation between the water marks ?
7、 summary
The water level is similar to the clock in life , Through the clock, we know the current time in a few minutes and seconds , This " current time " stay flink It corresponds to a timestamp , The closing of the window is triggered by a timestamp , Trigger the execution of scheduled tasks . It is also similar to the role of a reference .
边栏推荐
- cmd看控制台输出红桃、方块、黑桃、梅花乱码解决
- TX2 mirror source settings
- NXP imx8mp learning records
- Scheduling function: Splunk operator Controller Manager
- 为什么程序员写的第一个程序是“Hello World!”
- Wxml template syntax, wxss template style, global configuration, page configuration, and network data request
- eu5,eu7,ex3,ex5安装第三方app
- Select in golang concurrent programming
- C51的一些日记
- 聊聊flink水位线
猜你喜欢

告警日志中出现ORA-48132 ORA-48170

3de从设计中恢复

Shelling of ESP law of reverse crackme

(问题解决) 缺少gcr.io/kubebuilder/kube-rbac-proxy:v0.8.0

Wxml template syntax, wxss template style, global configuration, page configuration, and network data request

聊聊flink水位线

CMD view the console output of hearts, diamonds, spades and clubs to solve the garbled code

【原理图和PCB】基于单片机的超声波测距仪设计

服装实体店如何突破销售困境

Vs loading symbols causes program to start slowly
随机推荐
golang并发编程之runtime包
MySQL index creation, optimization analysis and index optimization
3de 保存到收藏夹
【Kubernetes 系列】Kubernetes 是什么?
不规范的命名
Atcoder beginer contest 252 (Dijkstra, reverse thinking)
利用yolov5训练自己的数据集; yolov5的安装与使用 ; yolov5源码解读
为什么在高并发下很容易就被setInterval给坑了
泛微 E-cology V9 信息泄露漏洞
利用jemalloc解决flink的内存溢出问题
工厂模式
AtCoder Regular Contest 142
我们如何解决了RealSense偏色问题?
tx2镜像源设置
ASUS reinstallation system keyboard lamp failure = & gt; Refitting the ATK drive
Understand virtual memory from the root
3de 移动物体的位置
【NVMe2.0b 12】NVM 容量模型
Template as interface
When 618 attacks, how to choose between Beibei X3 and Jimi h3s? Take you all-round in-depth analysis