当前位置:网站首页>Flink code is written like this. It's strange that the window can be triggered (bad programming habits)
Flink code is written like this. It's strange that the window can be triggered (bad programming habits)
2022-07-03 13:11:00 【Big data sheep said】
1. Preface - Say first conclusion
This article mainly records the use of DataStream API When implementing the application of event time window, the pit that the window does not trigger the problem and its troubleshooting process will be encountered .
Bloggers hope that after reading this article, you must develop this programming habit : Use DataStream API Realization Flink When the task ,Watermark Assigner Can get close to Source The node is close to Source node , Try to .
If you want to ask why , So let's look down !!!
I will explain the above problems and why I suggest so from the following chapters , I hope that we can learn from each other , Bring you some inspiration .
Stepping on a pit scene - What kind of pit is it
Problem finding chapter - The investigation process of the pit
Analysis of problem principle - What is the mechanism that causes the problem
Pit avoidance - How to avoid this problem
Conclusion
2. Stepping on a pit scene - What kind of pit is it
2.1. Demand scenarios
First, let's introduce a requirement scenario corresponding to this pit and the implementation code of the first version .
demand : In the e-commerce platform , According to the heartbeat log of online users ( Every time 30s Report a user heartbeat log ) Calculate the current minute on the shopping cart page (Shopping-Cart) Number of people staying online .
data source : Every time 30s User heartbeat log reported once (
user_id、page、timeThe three fields correspond touser id、 User's page 、 Log reporting time)Data processing : First filter out the shopping cart and scroll the user's heartbeat log according to the time stamp (Tumble) Aggregate calculation
Data collection : Aggregated result data per minute (
uv、timeThe two fields correspond toThe number of people online at the same time in the current minute of the shopping cart page 、 The timestamp of the current minute)
Flink DataStream API The specific implementation code is as follows :
public class WatermarkTest {
public static void main(String[] args) throws Exception {
// Get Flink Environmental Science , The interface encapsulated by the blogger FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
// Set concurrency
flinkEnv.env().setParallelism(100);
flinkEnv.env()
// data source : Reported logs
.addSource(xxx)
// Filter out Shopping cart page (Shopping-Cart) The data of
.filter(new FilterFunction<SourceModel>() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// Distribute Watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SourceModel>(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// For consolidation ,shuffle Into an operator , Therefore, the returned result here is fixed as 0
.keyBy(new KeySelector<SourceModel, Long>() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// Open a one minute scrolling time window
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// Calculation uv Processing logic
.process(new ProcessWindowFunction<SourceModel, SinkModel, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<SourceModel> elements,
Collector<SinkModel> out) throws Exception {
long windowStart = context.window().getStart();
Set<Long> s = new HashSet<>();
elements.forEach(new Consumer<SourceModel>() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});
out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// Output
.addSink(xxx);
}
// input data Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}
// Output data Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}
}
2.2. Problem scenario
When we deploy this task to the cluster environment to run , But found that there has been no data output , But input data ( User heartbeat log ) There has always been a lot of data .
3. Problem finding chapter - The investigation process of the pit
adopt Flink web ui It is found that the performance of each operator is as follows :
Source operator : Always be able to consume data , And from web ui According to the input and output flow, the amount of data is very large
Filter operator :Filter Operators have inputs and outputs , The input is very large , But the output data is very little ( This is due to business reasons , In terms of shopping business, only a very small number of people will stay on the shopping cart page )
Scrolling window operator : There is very little input data , But there has been no output , And from web ui Look at the operator's Watermark There is no such thing
From here on, the problem is clear .
At least from Flink web ui From the above point of view, it is because the window operator does not Watermark The resulting window data did not trigger the calculation .
The first guess at this time is : Window operator single concurrency above Watermark Not aligned !!!
Next, let's take a look at the overall verification process of this conjecture :
Because of our Watermark Assigner Is written in the book Filter After the operator , therefore Watermark The generation of is also based on Filter Of the data after the operator . Therefore, whether the positioning is caused by the above conjecture , We need to estimate Filter The amount of data produced by the operator to verify .
After verification , Find out Filter The data produced after the operator , The total data output per minute to the downstream operator is less than 60 strip . That is to say, in our 100 Concurrent tasks , At most... Per minute 60 Concurrent Filter The operator will output data to the downstream scrolling window operator , At least 40 A concurrent operator does not send any data to the downstream scrolling window operator .
Final , For the downstream scrolling window operator , You can't do Watermark alignment ! Therefore, the window cannot trigger .
Find the cause of the problem .
4. Analysis of problem principle - What is the mechanism that causes the problem
Want to understand Watermark alignment What's going on , Let's first look at Flink Medium Watermark Transmission and computer system :
- Watermark transport : radio broadcast . The radio here means
A concurrency of upstream operatorsWill turn out for the Able to connect toAll concurrency of downstream operatorsradio broadcast , This is related to the concurrency of upstream and downstream operators Shuffle Mechanisms related to . The radio here doesn't say Flink Provided BroadCast Programming API!!!
give an example : If a task 100 Concurrent , Between upstream and downstream operators Shuffle Strategy is Forward, Then a concurrent of the upstream operator Watermark Only the one to which the downstream operator is connected will be sent concurrently Watermark; If the strategy is Hash\Rebalance, Then a concurrent Watermark Will be sent to all concurrent downstream operators Watermark.
- Watermark Calculation method : A concurrency of the downstream operator accepts the concurrency of the upstream operator Watermark After that, the downstream operator is currently concurrent Watermark Calculation method ( The upstream and downstream here refers to Channel Connected ), Calculation formula :
Downstream operator concurrency Watermark = min( Upstream operator concurrency 1 Sent Watermark, Upstream operator concurrency 2 Sent Watermark…)
namely
Downstream operator concurrency Watermark = All upstream operators are sent to downstream operators concurrently Watermark The minimum value of .
- Watermark alignment : Downstream operators are concurrent Watermark Concurrent operations that rely on upstream operators Watermark When there is a big difference , This is it. Watermark No alignment , give an example : There is a concurrent transmission on the operator Watermark yes
23:59branch , Another concurrent transmission Watermark yes23:00branch , In the middle 59 minute , This situation is generally abnormal , So it's called no alignment . On the contrary, if Watermark Small differences , It's called Watermark alignment .
Another picture to see Watermark The transmission process of , Deepen the understanding :

Watermark spread
Back to the above case , One minute upstream operator only 60 There are concurrent data , Sent Watermark To the downstream window operator , rest 40 Not a hair .
So the downstream window operator Watermark There is no , So the window doesn't trigger .
5. Pit avoidance - How to avoid this problem
In the above scenario , In fact, the root cause of the problem is that the data goes through ( The shopping cart page ) After conditional filtering , The amount of data has become very small .
Watermark Assigner Generate a very small amount of data from a very small amount of data Watermark, Yes 40 No concurrency Watermark Generate , The downstream operator appears Watermark Not right Scenario .
So the solution is simple , Is to generate more Watermark, Make sure :
although Filter After that, there is little data ,Filter After operator processing , Each concurrency has enough Watermark To pass to the downstream window operator , To continuously trigger the calculation and result output of the window .
Specific solutions : take Watermark Assigner Rewrite the Source After operator ,Filter Before the operator . The code is as follows :
public class WatermarkTest {
public static void main(String[] args) throws Exception {
// Get Flink Environmental Science , The interface encapsulated by the blogger FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(100);
flinkEnv.env()
// data source
.addSource(xxx)
// Distribute Watermark, Move to Filter Before
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SourceModel>(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// Filter out Shopping cart page (Shopping-Cart) The data of
.filter(new FilterFunction<SourceModel>() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// in order to shuffle Merge into an operator , So return the result key Fixed for 0
.keyBy(new KeySelector<SourceModel, Long>() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// Open a one minute scrolling time window
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// Calculation uv Processing logic
.process(new ProcessWindowFunction<SourceModel, SinkModel, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<SourceModel> elements,
Collector<SinkModel> out) throws Exception {
long windowStart = context.window().getStart();
Set<Long> s = new HashSet<>();
elements.forEach(new Consumer<SourceModel>() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});
out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// Output
.addSink(xxx);
}
// input data Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}
// Output data Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}
}
The principle of the solution : In the above business scenario ,Source There are a lot of data , We can use a lot of Source data , So that Watermark Assigner Can continuously produce Watermark To the downstream .
Even though Filter After operator , The amount of data to the downstream window operator is very small , however Watermark It won't be Filter Operator filtering , a large number of Watermark It can still be transmitted to the window operator normally , bring Watermark alignment , So as to ensure the continuous trigger and result output of window operator .
The solution is good , However, there is a very low probability that the number will be lost in disorder :: give an example ,Watermark Is in Source After the operator , There may be one 23:50:50 Of The shopping cart page The log data is in 23:52:00 Of Main page of the website The log data arrives at , that Watermark Has risen to 23:51:00 The second ,23:50 The sub window has been triggered , So this 23:50:50 Of The shopping cart page The data is discarded by the window operator .
6. Conclusion
This article mainly records the use of DataStream API Because will Watermark Assigner Set too far back , As a result of Watermark Unable to align , Thus, the problem that the event time window does not trigger .
Programming habits suggested by bloggers : Use DataStream API Realization Flink When the task ,Watermark Assigner Can get close to Source The node is close to Source node , Be able to lead as far as possible .
边栏推荐
- [exercise 6] [Database Principle]
- 剑指 Offer 14- II. 剪绳子 II
- studio All flavors must now belong to a named flavor dimension. Learn more
- 【数据库原理及应用教程(第4版|微课版)陈志泊】【第七章习题】
- C graphical tutorial (Fourth Edition)_ Chapter 13 entrustment: delegatesamplep245
- 人身变声器的原理
- Gan totem column bridgeless boost PFC (single phase) seven PFC duty cycle feedforward
- C graphical tutorial (Fourth Edition)_ Chapter 20 asynchronous programming: examples - using asynchronous
- C graphical tutorial (Fourth Edition)_ Chapter 15 interface: interfacesamplep268
- 剑指 Offer 17. 打印从1到最大的n位数
猜你喜欢

Gan totem column bridgeless boost PFC (single phase) seven PFC duty cycle feedforward

解决 System has not been booted with systemd as init system (PID 1). Can‘t operate.

How to get user location in wechat applet?

Flink SQL knows why (7): haven't you even seen the ETL and group AGG scenarios that are most suitable for Flink SQL?

Idea full text search shortcut ctr+shift+f failure problem

The 35 required questions in MySQL interview are illustrated, which is too easy to understand

【数据库原理及应用教程(第4版|微课版)陈志泊】【第六章习题】

Brief introduction to mvcc

2022-02-14 incluxdb cluster write data writetoshard parsing

stm32和电机开发(从mcu到架构设计)
随机推荐
Kotlin notes - popular knowledge points asterisk (*)
2022-02-09 survey of incluxdb cluster
C graphical tutorial (Fourth Edition)_ Chapter 20 asynchronous programming: examples - using asynchronous
(latest version) WiFi distribution multi format + installation framework
[Exercice 5] [principe de la base de données]
php:&nbsp; The document cannot be displayed in Chinese
[exercise 7] [Database Principle]
Glide question you cannot start a load for a destroyed activity
如何在微信小程序中获取用户位置?
Image component in ETS development mode of openharmony application development
Kotlin - 改良装饰者模式
Application of ncnn neural network computing framework in orange school orangepi 3 lts development board
2022-01-27 use liquibase to manage MySQL execution version
Flink SQL knows why (7): haven't you even seen the ETL and group AGG scenarios that are most suitable for Flink SQL?
2022-01-27 redis cluster technology research
Social community forum app ultra-high appearance UI interface
[colab] [7 methods of using external data]
Will Huawei be the next one to fall
【历史上的今天】7 月 3 日:人体工程学标准法案;消费电子领域先驱诞生;育碧发布 Uplay
C graphical tutorial (Fourth Edition)_ Chapter 15 interface: interfacesamplep268