当前位置:网站首页>Flink code is written like this. It's strange that the window can be triggered (bad programming habits)
Flink code is written like this. It's strange that the window can be triggered (bad programming habits)
2022-07-03 13:11:00 【Big data sheep said】
1. Preface - Say first conclusion
This article mainly records the use of DataStream API When implementing the application of event time window, the pit that the window does not trigger the problem and its troubleshooting process will be encountered .
Bloggers hope that after reading this article, you must develop this programming habit : Use DataStream API Realization Flink When the task ,Watermark Assigner Can get close to Source The node is close to Source node , Try to .
If you want to ask why , So let's look down !!!
I will explain the above problems and why I suggest so from the following chapters , I hope that we can learn from each other , Bring you some inspiration .
Stepping on a pit scene - What kind of pit is it
Problem finding chapter - The investigation process of the pit
Analysis of problem principle - What is the mechanism that causes the problem
Pit avoidance - How to avoid this problem
Conclusion
2. Stepping on a pit scene - What kind of pit is it
2.1. Demand scenarios
First, let's introduce a requirement scenario corresponding to this pit and the implementation code of the first version .
demand : In the e-commerce platform , According to the heartbeat log of online users ( Every time 30s Report a user heartbeat log ) Calculate the current minute on the shopping cart page (Shopping-Cart) Number of people staying online .
data source : Every time 30s User heartbeat log reported once (
user_id、page、timeThe three fields correspond touser id、 User's page 、 Log reporting time)Data processing : First filter out the shopping cart and scroll the user's heartbeat log according to the time stamp (Tumble) Aggregate calculation
Data collection : Aggregated result data per minute (
uv、timeThe two fields correspond toThe number of people online at the same time in the current minute of the shopping cart page 、 The timestamp of the current minute)
Flink DataStream API The specific implementation code is as follows :
public class WatermarkTest {
public static void main(String[] args) throws Exception {
// Get Flink Environmental Science , The interface encapsulated by the blogger FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
// Set concurrency
flinkEnv.env().setParallelism(100);
flinkEnv.env()
// data source : Reported logs
.addSource(xxx)
// Filter out Shopping cart page (Shopping-Cart) The data of
.filter(new FilterFunction<SourceModel>() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// Distribute Watermark
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SourceModel>(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// For consolidation ,shuffle Into an operator , Therefore, the returned result here is fixed as 0
.keyBy(new KeySelector<SourceModel, Long>() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// Open a one minute scrolling time window
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// Calculation uv Processing logic
.process(new ProcessWindowFunction<SourceModel, SinkModel, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<SourceModel> elements,
Collector<SinkModel> out) throws Exception {
long windowStart = context.window().getStart();
Set<Long> s = new HashSet<>();
elements.forEach(new Consumer<SourceModel>() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});
out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// Output
.addSink(xxx);
}
// input data Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}
// Output data Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}
}
2.2. Problem scenario
When we deploy this task to the cluster environment to run , But found that there has been no data output , But input data ( User heartbeat log ) There has always been a lot of data .
3. Problem finding chapter - The investigation process of the pit
adopt Flink web ui It is found that the performance of each operator is as follows :
Source operator : Always be able to consume data , And from web ui According to the input and output flow, the amount of data is very large
Filter operator :Filter Operators have inputs and outputs , The input is very large , But the output data is very little ( This is due to business reasons , In terms of shopping business, only a very small number of people will stay on the shopping cart page )
Scrolling window operator : There is very little input data , But there has been no output , And from web ui Look at the operator's Watermark There is no such thing
From here on, the problem is clear .
At least from Flink web ui From the above point of view, it is because the window operator does not Watermark The resulting window data did not trigger the calculation .
The first guess at this time is : Window operator single concurrency above Watermark Not aligned !!!
Next, let's take a look at the overall verification process of this conjecture :
Because of our Watermark Assigner Is written in the book Filter After the operator , therefore Watermark The generation of is also based on Filter Of the data after the operator . Therefore, whether the positioning is caused by the above conjecture , We need to estimate Filter The amount of data produced by the operator to verify .
After verification , Find out Filter The data produced after the operator , The total data output per minute to the downstream operator is less than 60 strip . That is to say, in our 100 Concurrent tasks , At most... Per minute 60 Concurrent Filter The operator will output data to the downstream scrolling window operator , At least 40 A concurrent operator does not send any data to the downstream scrolling window operator .
Final , For the downstream scrolling window operator , You can't do Watermark alignment ! Therefore, the window cannot trigger .
Find the cause of the problem .
4. Analysis of problem principle - What is the mechanism that causes the problem
Want to understand Watermark alignment What's going on , Let's first look at Flink Medium Watermark Transmission and computer system :
- Watermark transport : radio broadcast . The radio here means
A concurrency of upstream operatorsWill turn out for the Able to connect toAll concurrency of downstream operatorsradio broadcast , This is related to the concurrency of upstream and downstream operators Shuffle Mechanisms related to . The radio here doesn't say Flink Provided BroadCast Programming API!!!
give an example : If a task 100 Concurrent , Between upstream and downstream operators Shuffle Strategy is Forward, Then a concurrent of the upstream operator Watermark Only the one to which the downstream operator is connected will be sent concurrently Watermark; If the strategy is Hash\Rebalance, Then a concurrent Watermark Will be sent to all concurrent downstream operators Watermark.
- Watermark Calculation method : A concurrency of the downstream operator accepts the concurrency of the upstream operator Watermark After that, the downstream operator is currently concurrent Watermark Calculation method ( The upstream and downstream here refers to Channel Connected ), Calculation formula :
Downstream operator concurrency Watermark = min( Upstream operator concurrency 1 Sent Watermark, Upstream operator concurrency 2 Sent Watermark…)
namely
Downstream operator concurrency Watermark = All upstream operators are sent to downstream operators concurrently Watermark The minimum value of .
- Watermark alignment : Downstream operators are concurrent Watermark Concurrent operations that rely on upstream operators Watermark When there is a big difference , This is it. Watermark No alignment , give an example : There is a concurrent transmission on the operator Watermark yes
23:59branch , Another concurrent transmission Watermark yes23:00branch , In the middle 59 minute , This situation is generally abnormal , So it's called no alignment . On the contrary, if Watermark Small differences , It's called Watermark alignment .
Another picture to see Watermark The transmission process of , Deepen the understanding :

Watermark spread
Back to the above case , One minute upstream operator only 60 There are concurrent data , Sent Watermark To the downstream window operator , rest 40 Not a hair .
So the downstream window operator Watermark There is no , So the window doesn't trigger .
5. Pit avoidance - How to avoid this problem
In the above scenario , In fact, the root cause of the problem is that the data goes through ( The shopping cart page ) After conditional filtering , The amount of data has become very small .
Watermark Assigner Generate a very small amount of data from a very small amount of data Watermark, Yes 40 No concurrency Watermark Generate , The downstream operator appears Watermark Not right Scenario .
So the solution is simple , Is to generate more Watermark, Make sure :
although Filter After that, there is little data ,Filter After operator processing , Each concurrency has enough Watermark To pass to the downstream window operator , To continuously trigger the calculation and result output of the window .
Specific solutions : take Watermark Assigner Rewrite the Source After operator ,Filter Before the operator . The code is as follows :
public class WatermarkTest {
public static void main(String[] args) throws Exception {
// Get Flink Environmental Science , The interface encapsulated by the blogger FlinkEnv
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(100);
flinkEnv.env()
// data source
.addSource(xxx)
// Distribute Watermark, Move to Filter Before
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SourceModel>(Time.minutes(1)) {
@Override
public long extractTimestamp(SourceModel element) {
return element.getTime();
}
})
// Filter out Shopping cart page (Shopping-Cart) The data of
.filter(new FilterFunction<SourceModel>() {
@Override
public boolean filter(SourceModel value) throws Exception {
return value.getPage().equals("Shopping-Cart");
}
})
// in order to shuffle Merge into an operator , So return the result key Fixed for 0
.keyBy(new KeySelector<SourceModel, Long>() {
@Override
public Long getKey(SourceModel value) throws Exception {
return 0L;
}
})
// Open a one minute scrolling time window
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
// Calculation uv Processing logic
.process(new ProcessWindowFunction<SourceModel, SinkModel, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<SourceModel> elements,
Collector<SinkModel> out) throws Exception {
long windowStart = context.window().getStart();
Set<Long> s = new HashSet<>();
elements.forEach(new Consumer<SourceModel>() {
@Override
public void accept(SourceModel sourceModel) {
s.add(sourceModel.userId);
}
});
out.collect(
SinkModel
.builder()
.uv(s.size())
.time(windowStart)
.build()
);
}
})
// Output
.addSink(xxx);
}
// input data Model
@Data
@Builder
private static class SourceModel {
private long userId;
private String page;
private long time;
}
// Output data Model
@Data
@Builder
private static class SinkModel {
private long uv;
private long time;
}
}
The principle of the solution : In the above business scenario ,Source There are a lot of data , We can use a lot of Source data , So that Watermark Assigner Can continuously produce Watermark To the downstream .
Even though Filter After operator , The amount of data to the downstream window operator is very small , however Watermark It won't be Filter Operator filtering , a large number of Watermark It can still be transmitted to the window operator normally , bring Watermark alignment , So as to ensure the continuous trigger and result output of window operator .
The solution is good , However, there is a very low probability that the number will be lost in disorder :: give an example ,Watermark Is in Source After the operator , There may be one 23:50:50 Of The shopping cart page The log data is in 23:52:00 Of Main page of the website The log data arrives at , that Watermark Has risen to 23:51:00 The second ,23:50 The sub window has been triggered , So this 23:50:50 Of The shopping cart page The data is discarded by the window operator .
6. Conclusion
This article mainly records the use of DataStream API Because will Watermark Assigner Set too far back , As a result of Watermark Unable to align , Thus, the problem that the event time window does not trigger .
Programming habits suggested by bloggers : Use DataStream API Realization Flink When the task ,Watermark Assigner Can get close to Source The node is close to Source node , Be able to lead as far as possible .
边栏推荐
- Gan totem column bridgeless boost PFC (single phase) seven PFC duty cycle feedforward
- C graphical tutorial (Fourth Edition)_ Chapter 13 entrustment: delegatesamplep245
- sitesCMS v3.0.2发布,升级JFinal等依赖
- Finite State Machine FSM
- 开始报名丨CCF C³[email protected]奇安信:透视俄乌网络战 —— 网络空间基础设施面临的安全对抗与制裁博弈...
- [exercise 5] [Database Principle]
- 剑指 Offer 17. 打印从1到最大的n位数
- Fabric.js 更换图片的3种方法(包括更换分组内的图片,以及存在缓存的情况)
- Kotlin - 改良装饰者模式
- [Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter 7 exercises]
猜你喜欢

Some thoughts on business

并网-低电压穿越与孤岛并存分析
![[network counting] Chapter 3 data link layer (2) flow control and reliable transmission, stop waiting protocol, backward n frame protocol (GBN), selective retransmission protocol (SR)](/img/45/c2d7934b886d8090373ca9e6e23c97.gif)
[network counting] Chapter 3 data link layer (2) flow control and reliable transmission, stop waiting protocol, backward n frame protocol (GBN), selective retransmission protocol (SR)
![[problem exploration and solution of one or more filters or listeners failing to start]](/img/82/e7730d289c4c1c4800b520c58d975a.jpg)
[problem exploration and solution of one or more filters or listeners failing to start]
![[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter 6 exercises]](/img/c0/92e9e52f1f643b66720697523a1794.png)
[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter 6 exercises]

正则表达式

【数据库原理及应用教程(第4版|微课版)陈志泊】【第六章习题】
![[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter IV exercises]](/img/8b/bef94d11ac22e3762a570dab3a96fa.jpg)
[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter IV exercises]

Leetcode234 palindrome linked list

Integer case study of packaging
随机推荐
Integer case study of packaging
An example of newtonjason
自抗扰控制器七-二阶 LADRC-PLL 结构设计
Oracle memory management
Export the entire Oracle Database
剑指 Offer 16. 数值的整数次方
C graphical tutorial (Fourth Edition)_ Chapter 17 generic: genericsamplep315
Sword finger offer 11 Rotate the minimum number of the array
The latest version of lottery blind box operation version
CVPR 2022 image restoration paper
Flick SQL knows why (10): everyone uses accumulate window to calculate cumulative indicators
SSH login server sends a reminder
【R】【密度聚类、层次聚类、期望最大化聚类】
[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter 7 exercises]
Slf4j log facade
Kotlin - improved decorator mode
IDEA 全文搜索快捷键Ctr+Shift+F失效问题
[Database Principle and Application Tutorial (4th Edition | wechat Edition) Chen Zhibo] [Chapter 6 exercises]
Fabric.js 更换图片的3种方法(包括更换分组内的图片,以及存在缓存的情况)
SLF4J 日志门面