当前位置:网站首页>Flink sliding window understanding & introduction to specific business scenarios
Flink sliding window understanding & introduction to specific business scenarios
2022-07-27 01:05:00 【Me fan】
Sliding window is calculation Data from the past period to the current window , Like windows size by 1 day, In steps of 5 minutes, Then every five minute window will calculate the data of the day before the end time of the window
At first, my understanding was to calculate the period from the present to the future ( A very wrong idea )
1. Window partition and data cleaning
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> localSource = env.socketTextStream("localhost", 8888);
localSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
@Override
public Tuple3<String, Integer, String> map(String str) throws Exception {
String[] split = str.split(",");
return Tuple3.of(split[0], 1, split[1]);
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new WyTimestampAssigner()))
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5), Time.minutes(1))
.sum(1).print();
env.execute("slding window T");demo It's just like that , The data format is : cup,2020-12-22 11:00:00.000
eg: window size=5m,slide=1m

Enter the time stamp | window_start | window_end | clean_time |
2020-12-22 11:00:00 | 2020/12/22 11:0:0 | 2020/12/22 11:5:0 | 2020/12/22 11:05 |
........total 5 | |||
2020/12/22 10:56:0 | 2020/12/22 11:1:0 |
When the window size is 5 minute , In steps of 1 minutes ,2020-12-22 11:00:00 This data is in the 5 In a window , The time when this data is cleared is 2020/12/22 11:05
eg: window size=1day slide=5
Input | window_start | window_end | clean_time |
2020-12-22 11:00:00 | 2020/12/22 11:00 | 2020/12/23 11:00 | 2020/12/23 11:00 |
.........total 288 | |||
2020/12/21 11:05 | 2020/12/22 11:05 |
When the window size is 1 God , In steps of 5 minutes ,2020-12-22 11:00:00 This data is in the 288 In a window , The time when this data is cleared is 2020/12/23 11:00
For source code understanding, please refer to https://www.jianshu.com/p/45b03390b258
Introduction to specific business scenarios ( Here are dataStream and sql Two implementations )
The calculation is similar to GTV Indicators of , It is required to output the statistical value from the early morning to the current time every five minutes
datastream Realization :
The complete code is not posted here , from keyBy Start to deal with , Set up 1 Sliding window... Days , In steps of 5, stay process Use in if Judge whether the data is today's to accumulate , That's it 00:00 after , Yesterday's data will not be counted , That is to achieve the business requirements 5 The statistical value from the early morning to the current time is output every minute
.keyBy(t -> t.getShop_name())
.timeWindow(Time.days(1),Time.minutes(5))
.process(new ProcessWindowFunction<GoodDetails, Tuple3<String, String, Integer>, String, TimeWindow>() {
SimpleDateFormat sdf_million = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
SimpleDateFormat sdf_day = new SimpleDateFormat("yyyy-MM-dd");
@Override
public void process(String s, Context ctx, Iterable<GoodDetails> elements, Collector<Tuple3<String, String, Integer>> out) throws Exception {
Calendar cld = Calendar.getInstance();
Iterator<GoodDetails> iterator = elements.iterator();
String curentDay = sdf_day.format(cld.getTimeInMillis() - 180000); // Here minus 3 Minutes is because , When the early morning 23:55-00:00 When the window is triggered , hinder if Judgment will be inaccurate , The program goes here more than 00:00 了 , reduce 3 Minutes or a suitable unit
// Count
int countNum = 0;
while (iterator.hasNext()){
GoodDetails next = iterator.next();
String elementData = next.getRegion_name().substring(0,10);
if(elementData.equals(curentDay)){
countNum+=next.getGood_price();
}
}
long end = ctx.window().getEnd();
String windowEnd = sdf_million.format(end);
out.collect(Tuple3.of(windowEnd,s,countNum));
}
})Flinksql Realization :flinksql It seems that this kind of filtering in the window is not supported , If the boy is accompanied by sql Solutions for , Please share it in the comment area
error sql analysis :
SELECT
userid,
FROM_UNIXTIME(UNIX_TIMESTAMP(HOP_END(procTime, INTERVAL '5' minute, INTERVAL '1' day))+28800) as currentTime,
HOP_START (procTime, INTERVAL '5' minute, INTERVAL '1' day) as window_start,
HOP_END (procTime, INTERVAL '5' minute, INTERVAL '1' day) as window_end,
COUNT (click) as click_num
FROM
flink_kafka_join_click_zero_watermark
where DATE_FORMAT(ctime,'yyyy-MM-dd') = DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')
GROUP BY
HOP (procTime, INTERVAL '5' minute, INTERVAL '1' day),userid;I wrote this when I first realized the requirements sql, I think it can be achieved 5 The statistical results from the morning to the present are output every minute , result No, In the result table, you will find : The accumulation of data after midnight is still based on yesterday , The reason lies in : This where The condition is equivalent to datastream Load in source Terminal filter, The data that meets the conditions will flow into the window cache , And we're up there datastream The filter of is the filter in the window
eg: One 23:54 Sub data generation , Then it is satisfaction where Conditions of the , So flow into the window cache , When the next day's window triggers , This data is still in the window , Accumulation also takes this data into account
correct sql analysis :
The following paragraph sql Can achieve 5 Minutes interval triggers , However, individuals prefer to use the above datastream Sliding window implementation , One is the sliding window 5 Minutes will output all results , And this kind of trigger Only data that arrives within a certain time can be triggered key Aggregate output of Second, I tested , The following time intervals do not seem to follow 00:05 00:10.. . .. .. This kind of output
EnvironmentSettings build = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(build);
//trigger Trigger
Configuration tableConfig = tEnv.getConfig().getConfiguration();
tableConfig.setString("table.exec.emit.early-fire.enabled","true");
tableConfig.setString("table.exec.emit.early-fire.delay","5min");
// Scroll through the window all day , Output the results every five minutes
tEnv.executeSql("insert into sliding_window_local_pay\n" +
"select DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss') as current_day,\n" +
" userid,TUMBLE_START(procTime, INTERVAL '1' DAY) as window_start,\n" +
"TUMBLE_END(procTime, INTERVAL '1' DAY) as window_end,COUNT (1) as pay_num\n" +
"from flink_kafka_payInfo group by TUMBLE(procTime, INTERVAL '1' DAY),userid");
边栏推荐
猜你喜欢
随机推荐
Android——LitePal数据库框架的基本用法
Logback custom messageconverter
Solve the problem of direct blue screen restart when VMware Workstation virtual machine starts
Channel shutdown: channel error; protocol method: #method<channel. close>(reply-code=406, reply-text=
Uni-app开发App和插件以后如何开通广告盈利:uni-AD
2022.7.14DAY605
MYSQL分表DDL操作(存储过程)
基于Flink实时项目:用户行为分析(一:实时热门商品统计)
数据库表连接的简单解释
[hongminggu CTF 2021] write_ shell
MYSQL中的行锁升级表锁的原因
[问题]yum资源被占用怎么办
Scala-模式匹配
2022.7.9DAY601
Android——数据持久化技术(三) 数据库存储
腾讯升级视频号小程序直播功能,腾讯持续推广直播的底气是这项叫视立方(MLVB)的技术
flinksql 窗口提前触发
Reasons why row locks in MySQL upgrade table locks
解决rsyslog服务占用内存过高
Redisson working principle - source code analysis

![[WUSTCTF2020]CV Maker](/img/64/06023938e83acc832f06733b6c4d63.png)







