当前位置:网站首页>Flink watermark mechanism
Flink watermark mechanism
2022-07-24 05:40:00 【sf_ www】
1. Watermark What is it? ? What problem to solve ?
Flink It involves two important times ,Processing Time( The processing time ) and Event Time( Event time ), Stream processors that support event time need a way to measure the progress of event time . for example , When the event lasts for more than an hour , You need to inform the window operator that builds the hourly window , So that the operator can close the running window . How to determine whether a window has ended , This is not an easy problem to solve in streaming data processing system . If the window is based on processing time , Then the problem is really easy to solve , Because the processing time is completely based on the local clock ; But if the window is based on the event time , Because there may be delays in messages in distributed systems 、 The problem of disorderly arrival , Even if the system has received data outside the window boundary , It's not sure that all the previous data have arrived . Waterline (Watermark) The mechanism is used to solve this problem .
Watermark It is a mechanism to measure progress internally when using event time ( It's essentially a timestamp , See org.apache.flink.streaming.api.watermark.Watermark extends StreamElement), let me put it another way , When dealing with data streams that use the event time attribute ,Watermark It is a method for the system to measure the progress of data processing .Watermaker As part of the data flow, it carries a timestamp t. One Watermark(t) Indicates that the event time has arrived in this flow t, This means that all carry time stamps t'<=t The elements of have arrived ,Watermark(t) The timestamp of the following elements should >t. That is, it defines when to stop waiting for earlier data .
With Watermark, The system can determine whether the window using the event time has been completed . however Watermark Just a measure , The system uses it to evaluate the current progress , There is no guarantee that there will not be less than the current Watermark The news of . For this kind of news , namely “ late ” The news of , Special treatment is required . See the following section for details .
Watermark What has been solved ? It is to solve the problems of disordered order and delay of data arrival . When data based on event time flows into ⾏ When calculating , Stream processing is generated from events , To flow through source, Until then operator, There is a process and time in the middle . Although in most cases , flow to operator All the data are in the chronological order of events , However, it cannot be ruled out that due to equipment failure 、 The Internet 、 Back pressure and other reasons , Cause disorder and delay . That is, problems such as disorder and delay make it impossible for us to judge the current progress , and Watermark It represents the time schedule , Thus, we have the benchmark for processing , This is also the basis of window operation .
2. Watermark The generation and use of
( Time stamps and watermark It's all self 1970-01-01T00:00:00Z Milliseconds since )
2.1. Watermark Strategies Introduce
In order to deal with the event time ,Flink You need to know the event timestamp , This means that you need to assign event timestamps to each element in the flow . This is usually done by using TimestampAssigner Access from a field of the element / Extract the timestamp to complete .
Timestamp allocation and generation Watermark Is closely related to the ,Watermark Tell the system the progress of the event time . You can specify a WatermarkGenerator To configure this .
Use Flink Watermark API Expect a period that includes TimestampAssigner and WatermarkGenerator Of WatermarkStrategy. Many common strategies can be used as WatermarkStrategy Static methods on use , But users can also build their own strategies when needed .
stay Flink There are two places to use WatermarkStrategy:
-》 Directly on the data source
-》 After data source operation ( It should only be used when you cannot set a policy directly on the source )
The first option is better , Because it allows the source code to do watermarking Use about in logic shards/partitions/splits Information about . then , Sources can usually be tracked at a finer level watermark, The whole produced by the source watermark Will be more accurate . Specifying a watermark policy directly on the source usually means that you must use a source specific interface . Here you can refer to WatermarkStrategy and Kafka Connector to understand how it works in Kafka Working on the connector , And about each partition watermark More details about how to work there .
Use WatermarkStrategy An example of :
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
The above code is used in this way WatermarkStrategy Get a stream , And generate a timestamp element and watermark The new stream of . If the original stream already has a timestamp and / or watermark, Then the timestamp evaluator will overwrite them .
2.2. To write WatermarkGenerators
TimestampAssigner It's simple , There is only one method in this interface extractTimestamp, Here we mainly introduce WatermarkGenerator Interface .
/**
* The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
* fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code
* AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/
@Public
public interface WatermarkGenerator<T> {/**
* Called for every event, allows the watermark generator to examine and remember the event
* timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated depends on {@link
* ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
Watermark There are two different ways to generate :periodic and punctuated.
periodic Generators usually pass onEvent() Observe incoming events , Then call... In the framework onPeriodicEmit() When the watermark is sent out .
puncutated The generator will view onEvent() In the event , And wait for special marking events or punctuation that carry watermark information in the stream . When it sees one of these events , It immediately sends out a watermark . Usually ,puncutated The generator will not start from onPeriodicEmit() Send watermark in .
That is, both produce watermark There are some differences in places and ways .
-》 Realization Periodic WatermarkGenerator
periodic The generator observes flow events and periodically generates watermark( It may depend on the flow element , It may also be based purely on processing time ).
Generate watermark The interval of ( Every time n millisecond ) It's through ExecutionConfig.setAutoWatermarkInterval(…) To define the . The generator is called every time onPeriodicEmit() Method , If returned watermark Not empty and larger than the previous watermark, Will trigger a new watermark. Here are two simple examples :
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}}
/**
* This generator generates watermarks that are lagging behind processing time
* by a fixed amount. It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 seconds
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// don't need to do anything because we work on processing time
}@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
-》 Realization Punctuated WatermarkGenerator
puncutated The generator will observe the flow of events , And when it sees the special element carrying the watermark information, it sends out the watermark .
Be careful : Can be generated on each event watermark. however , Because of each watermark Will cause some downstream calculations , So much watermark It can degrade performance .
Examples are as follows :
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}@Override
public void onPeriodicEmit(WatermarkOutput output) {
// don't need to do anything because we emit in reaction to events above
}
}
2.3. operator How to deal with it watermark
Generally speaking ,operator Need to be given watermark Handle it completely before forwarding it to the downstream . for example ,WindowOperator All windows that should be triggered will be calculated first , Only when generating watermark After all outputs triggered ,watermark It will be sent to the downstream . let me put it another way , Because of the emergence of watermark And all the elements produced will be in watermark Issued before .
The same rules apply to TwoInputStreamOperator. However , under these circumstances , The current of the operator watermark Is defined as the minimum of its two inputs .
See... For details :
OneInputStreamOperator#processWatermark,TwoInputStreamOperator#processWatermark1,TwoInputStreamOperator#processWatermark2

2.4. Late data processing mechanism
1. discarded ( Default )
2. allowedLateness Specify the time to allow data delay
In some cases , We hope to provide a tolerant time for late data .Flink Provides allowedLateness Method can set a delay time for late data , When a delay is specified The data arriving in the interval can still be triggered window Executive . call .allowedLateness(Time lateness)
3. sideOutputLateData Collect late data
adopt sideOutputLateData Late data can be collected uniformly , Unified storage , Facilitate troubleshooting later . The ⽅ The method sends the delayed data to a given OutputTag Of side output In the middle , Then you can go through SingleOutputStreamOperator.getSideOutput(OutputTag) To get these delayed data .
2.5. API in Watermark Use
Example 1: Use Periodic Watermark
public class TestPeriodicWatermark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(1000);
DataStream<String> dataSource = env.socketTextStream("manager-1", 10009);
DataStream<Tuple2<String, Long>> mapData = dataSource.map(s ->
new Tuple2<>(s.split(",")[0], Long.parseLong(s.split(",")[1]))).returns(Types.TUPLE(Types.STRING, Types.LONG));
mapData.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((SerializableTimestampAssigner<Tuple2<String, Long>>) (o, l) -> o.f1))
.keyBy((KeySelector<Tuple2<String, Long>, String>) tuple2 -> tuple2.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(30)))
.process(new ProcessWindowFunction<Tuple2<String, Long>, Object, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> iterable, Collector<Object> collector) throws Exception {
long sum = 0;
int index = 0;
Iterator<Tuple2<String, Long>> it = iterable.iterator();
Tuple2<String, Long> first = null, end = null;
while (it.hasNext()) {
Tuple2<String, Long> item = it.next();
if (index == 0) {
first = item;
}
end = item;
sum += 1;
index++;
}
System.out.println(" Window start time : " + context.window().getStart());
System.out.println(" Window end time : " + context.window().getEnd());
System.out.println(" The first data in the window : " + first);
System.out.println(" Window last data : " + end);
System.out.println(" At present watermark: " + context.currentWatermark());
System.out.println();
collector.collect(new Tuple2<>(key, sum));
}
}).print();env.setRestartStrategy(RestartStrategies.noRestart());
env.execute("TestPeriodicWatermark");
}
}
2.6. SQL in Watermark Use
In the process of creating the table DDL In the definition of
The event time attribute can be used WATERMARK Statements in CREATE TABLE DDL Defined in .WATERMARK Statement defines an on an existing field Watermark Build expression , At the same time, mark the existing field as the time attribute field .
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- Statement user_action_time Is the event time attribute , And use Delay 5 Second strategy to generate watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
3. Some notes
3.1. In multi parallelism data stream Watermark
In the case of Multiple Parallelism ( Multiple data source streams or single stream multiple partition),Watermark There will be an alignment mechanism , This alignment mechanism takes all the Channel The smallest of all Watermark.
3.2. window Trigger and remove
window The trigger condition of :
1、watermark Time >= window_end_time
2、 stay [window_start_time,window_end_time) There's data in
( Pay attention to the allowable delay allowed lateness In the scene of , The assumption is t,watermark The generation logic of is generally watermark=maxEventtime - t), such as : Official BoundedOutOfOrdernessWatermarks The implementation logic is :
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
See how to trigger the window , See window Of Trigger.
If we set 10s Time window of (window), that 0~10s,10~20s It's all a window , With 0~10s For example ,0 by start-time,10 by end-time. If there is 4 Data event-time Namely 8(A),12.5(B),9(C),13.5(D), We set up Watermarks For all current arrival data event-time The maximum value of minus the delay value 3.5 second
When A On arrival ,Watermarks by max{8}-3.5=8-3.5 = 4.5 < 10, It doesn't trigger computation
When B On arrival ,Watermarks by max(12.5,8)-3.5=12.5-3.5 = 9 < 10, It doesn't trigger computation
When C On arrival ,Watermarks by max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10, It doesn't trigger computation
When D On arrival ,Watermarks by max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10, Trigger calculation
When the calculation is triggered , Will A,C( Because they are all smaller than 10) Count it in , among C It's late .
stay 13.5s after 0~10s The window of is removed .
Another thing to note , For example, an operator 10 Concurrency , But it came into being watermark Only 6 individual , Then there are 4 Cannot receive watermark, As a result, the subsequent window calculation cannot be triggered . So pay attention to parallelism and other issues .
window Removal condition of :
Time ( Event or processing time ) exceed window_end_time add allowed lateness
3.3. Idle data source processing
Because the data source has partitions , such as kafka, Some partitions have no data , Lead to watermark Can't produce , As a result, the subsequent calculation cannot be triggered . At this time, you can set parameters to make it think that the partition is idle , You can leave it alone for the time being , Wait until there is data .
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
Refer to the official website :
边栏推荐
- Insanity:1(Insanity-Hosting)靶机渗透 —Vulnhub
- MySQL之索引&执行计划
- Vulnhub funbox: rookie (funbox2) target penetration
- Inventory Poka ecological potential project | cross chain characteristics to promote the prosperity of multi track
- 达梦数据库_DISQL下各种连接数据库的方法和执行SQL、脚本的方法
- 【数据挖掘】聚类分析的简要介绍
- Web3基金会「Grant计划」赋能开发者,盘点四大成功项目
- Restore UI design draft
- php的多选、单选结果怎么在前台显示?
- 【vsphere高可用】主机故障切换
猜你喜欢

Hurry in!! Easily master the three structures of "sequence", "branch" and "cycle" of C language

MySQL之索引&执行计划

Canvas Bezier Bezier curve

jsp标签02

仿某网站百度地图页面 百度API

Tree structure + node

Mapboxgl + GeoServer configuration local map tutorial

达梦数据库_逻辑架构基础

公司女同事深夜11点让我去她住处修电脑,原来是C盘爆红,看我一招搞定女同事....的电脑

Imitate Baidu API of Baidu map page of a website
随机推荐
Development technical guide | the most complete technical documents, tutorials and courses of substrate and Polkadot
达梦数据库_dmfldr工具使用说明
Wechat applet returns parameters or trigger events
公链Sui Layer1网络
Canvas - round
OpenGL draws a cone on the screen, which has four faces, each of which is a triangle. Add lighting and texture effects to the cone
Hurry in!! Take you to understand what is multi file, and easily master the usage of extern and static C language keywords!!!
Tabs tab (EL tabs)_ Cause the page to jam
Draw a moving teapot on the screen. The teapot first moves slowly towards the screen, becoming smaller and smaller, becoming more and more blurred; Then it grows bigger and clearer, and keeps repeatin
[Baidu map API] the version of the map JS API you are using is too low and no longer maintained. In order to ensure the normal use of the basic functions of the map, please upgrade to the latest versi
OpenGL simulates the process of a ball falling to the ground and bouncing up in real life. Draw a ball on the screen, and the ball will fall from top to bottom, hit the ground and bounce up again.
公司女同事深夜11点让我去她住处修电脑,原来是C盘爆红,看我一招搞定女同事....的电脑
spark 广播变量和累加器使用和原理
OpenGL draws two points on the screen, a blue point on the right, using anti aliasing technology, and a red point on the left, without anti aliasing technology. Compare the difference between the two
响应式页面
盘点波卡生态潜力项目 | 跨链特性促进多赛道繁荣
How to export Excel files with php+mysql
读《悟道:一位IT高管20年的职场心经》
【数据挖掘】聚类分析的简要介绍
如何强制卸载Google浏览器,不用担心Google打开为白板,亲测有效。