当前位置:网站首页>Flink code is written like this. It's strange that the window can be triggered (bad programming habits)
Flink code is written like this. It's strange that the window can be triggered (bad programming habits)
2022-07-03 13:11:00 【Big data sheep said】
1. Preface - Say first conclusion
This article mainly records the use of DataStream API When implementing the application of event time window, the pit that the window does not trigger the problem and its troubleshooting process will be encountered .
Bloggers hope that after reading this article, you must develop this programming habit : Use DataStream API Realization Flink When the task ,Watermark Assigner Can get close to Source The node is close to Source node , Try to .
If you want to ask why , So let's look down !!!
I will explain the above problems and why I suggest so from the following chapters , I hope that we can learn from each other , Bring you some inspiration .
Stepping on a pit scene - What kind of pit is it
Problem finding chapter - The investigation process of the pit
Analysis of problem principle - What is the mechanism that causes the problem
Pit avoidance - How to avoid this problem
Conclusion
2. Stepping on a pit scene - What kind of pit is it
2.1. Demand scenarios
First, let's introduce a requirement scenario corresponding to this pit and the implementation code of the first version .
demand : In the e-commerce platform , According to the heartbeat log of online users ( Every time 30s Report a user heartbeat log ) Calculate the current minute on the shopping cart page (Shopping-Cart) Number of people staying online .
data source : Every time 30s User heartbeat log reported once (
user_id、page、time
The three fields correspond touser id、 User's page 、 Log reporting time
)Data processing : First filter out the shopping cart and scroll the user's heartbeat log according to the time stamp (Tumble) Aggregate calculation
Data collection : Aggregated result data per minute (
uv、time
The two fields correspond toThe number of people online at the same time in the current minute of the shopping cart page 、 The timestamp of the current minute
)
Flink DataStream API The specific implementation code is as follows :
public class WatermarkTest {
public static void main(String[] args) throws Exception {
// Get Flink Environmental Science , The interface encapsulated by the blogger FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
// Set concurrency
flinkEnv.env().setParallelism(100);
flinkEnv.env()
// data source : Reported logs
.addSource(xxx)
// Filter out Shopping cart page (Shopping-Cart) The data of
.filter(new FilterFunction<SourceModel>() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// Distribute Watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SourceModel>(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// For consolidation ,shuffle Into an operator , Therefore, the returned result here is fixed as 0
.keyBy(new KeySelector<SourceModel, Long>() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// Open a one minute scrolling time window
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// Calculation uv Processing logic
.process(new ProcessWindowFunction<SourceModel, SinkModel, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<SourceModel> elements,
Collector<SinkModel> out) throws Exception {
long windowStart = context.window().getStart();
Set<Long> s = new HashSet<>();
elements.forEach(new Consumer<SourceModel>() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});
out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// Output
.addSink(xxx);
}
// input data Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}
// Output data Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}
}
2.2. Problem scenario
When we deploy this task to the cluster environment to run , But found that there has been no data output , But input data ( User heartbeat log ) There has always been a lot of data .
3. Problem finding chapter - The investigation process of the pit
adopt Flink web ui It is found that the performance of each operator is as follows :
Source operator : Always be able to consume data , And from web ui According to the input and output flow, the amount of data is very large
Filter operator :Filter Operators have inputs and outputs , The input is very large , But the output data is very little ( This is due to business reasons , In terms of shopping business, only a very small number of people will stay on the shopping cart page )
Scrolling window operator : There is very little input data , But there has been no output , And from web ui Look at the operator's Watermark There is no such thing
From here on, the problem is clear .
At least from Flink web ui From the above point of view, it is because the window operator does not Watermark The resulting window data did not trigger the calculation .
The first guess at this time is : Window operator single concurrency above Watermark Not aligned !!!
Next, let's take a look at the overall verification process of this conjecture :
Because of our Watermark Assigner Is written in the book Filter After the operator , therefore Watermark The generation of is also based on Filter Of the data after the operator . Therefore, whether the positioning is caused by the above conjecture , We need to estimate Filter The amount of data produced by the operator to verify .
After verification , Find out Filter The data produced after the operator , The total data output per minute to the downstream operator is less than 60 strip . That is to say, in our 100 Concurrent tasks , At most... Per minute 60 Concurrent Filter The operator will output data to the downstream scrolling window operator , At least 40 A concurrent operator does not send any data to the downstream scrolling window operator .
Final , For the downstream scrolling window operator , You can't do Watermark alignment ! Therefore, the window cannot trigger .
Find the cause of the problem .
4. Analysis of problem principle - What is the mechanism that causes the problem
Want to understand Watermark alignment What's going on , Let's first look at Flink Medium Watermark Transmission and computer system :
- Watermark transport : radio broadcast . The radio here means
A concurrency of upstream operators
Will turn out for the Able to connect toAll concurrency of downstream operators
radio broadcast , This is related to the concurrency of upstream and downstream operators Shuffle Mechanisms related to . The radio here doesn't say Flink Provided BroadCast Programming API!!!
give an example : If a task 100 Concurrent , Between upstream and downstream operators Shuffle Strategy is Forward, Then a concurrent of the upstream operator Watermark Only the one to which the downstream operator is connected will be sent concurrently Watermark; If the strategy is Hash\Rebalance, Then a concurrent Watermark Will be sent to all concurrent downstream operators Watermark.
- Watermark Calculation method : A concurrency of the downstream operator accepts the concurrency of the upstream operator Watermark After that, the downstream operator is currently concurrent Watermark Calculation method ( The upstream and downstream here refers to Channel Connected ), Calculation formula :
Downstream operator concurrency Watermark = min( Upstream operator concurrency 1 Sent Watermark, Upstream operator concurrency 2 Sent Watermark…)
namely
Downstream operator concurrency Watermark = All upstream operators are sent to downstream operators concurrently Watermark The minimum value of .
- Watermark alignment : Downstream operators are concurrent Watermark Concurrent operations that rely on upstream operators Watermark When there is a big difference , This is it. Watermark No alignment , give an example : There is a concurrent transmission on the operator Watermark yes
23:59
branch , Another concurrent transmission Watermark yes23:00
branch , In the middle 59 minute , This situation is generally abnormal , So it's called no alignment . On the contrary, if Watermark Small differences , It's called Watermark alignment .
Another picture to see Watermark The transmission process of , Deepen the understanding :
Watermark spread
Back to the above case , One minute upstream operator only 60 There are concurrent data , Sent Watermark To the downstream window operator , rest 40 Not a hair .
So the downstream window operator Watermark There is no , So the window doesn't trigger .
5. Pit avoidance - How to avoid this problem
In the above scenario , In fact, the root cause of the problem is that the data goes through ( The shopping cart page ) After conditional filtering , The amount of data has become very small .
Watermark Assigner Generate a very small amount of data from a very small amount of data Watermark, Yes 40 No concurrency Watermark Generate , The downstream operator appears Watermark Not right Scenario .
So the solution is simple , Is to generate more Watermark, Make sure :
although Filter After that, there is little data ,Filter After operator processing , Each concurrency has enough Watermark To pass to the downstream window operator , To continuously trigger the calculation and result output of the window .
Specific solutions : take Watermark Assigner Rewrite the Source After operator ,Filter Before the operator . The code is as follows :
public class WatermarkTest {
public static void main(String[] args) throws Exception {
// Get Flink Environmental Science , The interface encapsulated by the blogger FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(100);
flinkEnv.env()
// data source
.addSource(xxx)
// Distribute Watermark, Move to Filter Before
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SourceModel>(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// Filter out Shopping cart page (Shopping-Cart) The data of
.filter(new FilterFunction<SourceModel>() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// in order to shuffle Merge into an operator , So return the result key Fixed for 0
.keyBy(new KeySelector<SourceModel, Long>() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// Open a one minute scrolling time window
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// Calculation uv Processing logic
.process(new ProcessWindowFunction<SourceModel, SinkModel, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<SourceModel> elements,
Collector<SinkModel> out) throws Exception {
long windowStart = context.window().getStart();
Set<Long> s = new HashSet<>();
elements.forEach(new Consumer<SourceModel>() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});
out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// Output
.addSink(xxx);
}
// input data Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}
// Output data Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}
}
The principle of the solution : In the above business scenario ,Source There are a lot of data , We can use a lot of Source data , So that Watermark Assigner Can continuously produce Watermark To the downstream .
Even though Filter After operator , The amount of data to the downstream window operator is very small , however Watermark It won't be Filter Operator filtering , a large number of Watermark It can still be transmitted to the window operator normally , bring Watermark alignment , So as to ensure the continuous trigger and result output of window operator .
The solution is good , However, there is a very low probability that the number will be lost in disorder :: give an example ,Watermark Is in Source After the operator , There may be one 23:50:50
Of The shopping cart page
The log data is in 23:52:00
Of Main page of the website
The log data arrives at , that Watermark Has risen to 23:51:00
The second ,23:50
The sub window has been triggered , So this 23:50:50
Of The shopping cart page
The data is discarded by the window operator .
6. Conclusion
This article mainly records the use of DataStream API Because will Watermark Assigner Set too far back , As a result of Watermark Unable to align , Thus, the problem that the event time window does not trigger .
Programming habits suggested by bloggers : Use DataStream API Realization Flink When the task ,Watermark Assigner Can get close to Source The node is close to Source node , Be able to lead as far as possible .
边栏推荐
- 【习题六】【数据库原理】
- Oracle memory management
- SSH login server sends a reminder
- Sword finger offer 16 Integer power of numeric value
- 剑指 Offer 15. 二进制中1的个数
- [combinatorics] permutation and combination (multiple set permutation | multiple set full permutation | multiple set incomplete permutation all elements have a repetition greater than the permutation
- Setting up Oracle datagurd environment
- Logback log framework
- 【数据库原理复习题】
- [comprehensive question] [Database Principle]
猜你喜欢
Cache penetration and bloom filter
The upward and downward transformation of polymorphism
Huffman coding experiment report
Flick SQL knows why (10): everyone uses accumulate window to calculate cumulative indicators
Flink SQL knows why (7): haven't you even seen the ETL and group AGG scenarios that are most suitable for Flink SQL?
Finite State Machine FSM
Ali & ant self developed IDE
这本数学书AI圈都在转,资深ML研究员历时7年之作,免费电子版可看
Idea full text search shortcut ctr+shift+f failure problem
[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter 6 exercises]
随机推荐
【数据库原理及应用教程(第4版|微课版)陈志泊】【第五章习题】
Glide 4.6.1 API initial
elk笔记24--用gohangout替代logstash消费日志
Logback log framework
C graphical tutorial (Fourth Edition)_ Chapter 13 entrustment: what is entrustment? P238
【R】 [density clustering, hierarchical clustering, expectation maximization clustering]
Flink SQL knows why (7): haven't you even seen the ETL and group AGG scenarios that are most suitable for Flink SQL?
人身变声器的原理
My creation anniversary: the fifth anniversary
SSH登录服务器发送提醒
Luogup3694 Bangbang chorus standing in line
C graphical tutorial (Fourth Edition)_ Chapter 20 asynchronous programming: examples - using asynchronous
Sitescms v3.0.2 release, upgrade jfinal and other dependencies
Logback 日志框架
The upward and downward transformation of polymorphism
Quickly learn member inner classes and local inner classes
Sword finger offer14 the easiest way to cut rope
Loan calculator my pressure is high
高效能人士的七个习惯
[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter IV exercises]