当前位置:网站首页>Flink 滑动窗口理解&具体业务场景介绍
Flink 滑动窗口理解&具体业务场景介绍
2022-07-26 22:40:00 【me凡】
滑动窗口是计算 过去一段时间到当前窗口内的数据,比如窗口size为1 day,步长为5 minutes,那么每五分钟的窗口就会去计算窗口结束时间点前一天的数据
最初我的理解是计算当前到未来的一段时间(大错特错的想法)
1. 窗口划分与数据清理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> localSource = env.socketTextStream("localhost", 8888);
localSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
@Override
public Tuple3<String, Integer, String> map(String str) throws Exception {
String[] split = str.split(",");
return Tuple3.of(split[0], 1, split[1]);
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new WyTimestampAssigner()))
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5), Time.minutes(1))
.sum(1).print();
env.execute("slding window T");demo就是上面这样,数据格式为: cup,2020-12-22 11:00:00.000
eg: 窗口size=5m,slide=1m

输入时间戳 | window_start | window_end | clean_time |
2020-12-22 11:00:00 | 2020/12/22 11:0:0 | 2020/12/22 11:5:0 | 2020/12/22 11:05 |
........total 5 | |||
2020/12/22 10:56:0 | 2020/12/22 11:1:0 |
当窗口大小为5分钟,步长为1分钟时,2020-12-22 11:00:00 这条数据在5个窗口内,该数据被清理的时间为2020/12/22 11:05
eg:窗口size=1day slide=5
输入 | window_start | window_end | clean_time |
2020-12-22 11:00:00 | 2020/12/22 11:00 | 2020/12/23 11:00 | 2020/12/23 11:00 |
.........total 288 | |||
2020/12/21 11:05 | 2020/12/22 11:05 |
当窗口大小为1天,步长为5分钟时,2020-12-22 11:00:00 这条数据在288个窗口内,该数据被清理的时间为2020/12/23 11:00
源码理解可参考 https://www.jianshu.com/p/45b03390b258
具体业务场景介绍(这里介绍dataStream和sql两种实现)
计算类似GTV的指标,要求每五分钟输出一次从凌晨到当前时间的统计值
datastream实现:
这里不贴完整代码,从keyBy开始处理,设置1天的滑动窗口,步长为5,在process中使用if判断数据是不是今天的来进行累加,这样过了00:00后,昨天的数据不会被统计,也就实现了业务要求的5分钟输出一次从凌晨到当前时间的统计值
.keyBy(t -> t.getShop_name())
.timeWindow(Time.days(1),Time.minutes(5))
.process(new ProcessWindowFunction<GoodDetails, Tuple3<String, String, Integer>, String, TimeWindow>() {
SimpleDateFormat sdf_million = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
SimpleDateFormat sdf_day = new SimpleDateFormat("yyyy-MM-dd");
@Override
public void process(String s, Context ctx, Iterable<GoodDetails> elements, Collector<Tuple3<String, String, Integer>> out) throws Exception {
Calendar cld = Calendar.getInstance();
Iterator<GoodDetails> iterator = elements.iterator();
String curentDay = sdf_day.format(cld.getTimeInMillis() - 180000); //这里减3分钟是因为,当凌晨 23:55-00:00窗口触发的时候,后面的if判断会不准确,程序走到这里都超过00:00了,减3分钟或者合适的单位都可以
//计数
int countNum = 0;
while (iterator.hasNext()){
GoodDetails next = iterator.next();
String elementData = next.getRegion_name().substring(0,10);
if(elementData.equals(curentDay)){
countNum+=next.getGood_price();
}
}
long end = ctx.window().getEnd();
String windowEnd = sdf_million.format(end);
out.collect(Tuple3.of(windowEnd,s,countNum));
}
})Flinksql实现:flinksql貌似不支持这种窗口内的过滤处理,如果小伙伴有sql的解决方案,还麻烦分享到评论区
错误sql解析:
SELECT
userid,
FROM_UNIXTIME(UNIX_TIMESTAMP(HOP_END(procTime, INTERVAL '5' minute, INTERVAL '1' day))+28800) as currentTime,
HOP_START (procTime, INTERVAL '5' minute, INTERVAL '1' day) as window_start,
HOP_END (procTime, INTERVAL '5' minute, INTERVAL '1' day) as window_end,
COUNT (click) as click_num
FROM
flink_kafka_join_click_zero_watermark
where DATE_FORMAT(ctime,'yyyy-MM-dd') = DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')
GROUP BY
HOP (procTime, INTERVAL '5' minute, INTERVAL '1' day),userid;最初实现需求的时候写了这个sql,觉得可以实现5分钟输出一次凌晨到目前的统计结果,结果No,在结果表里会发现:凌晨以后数据的累加依然是在昨天的基础上进行累加,原因在于:这个where条件相当于datastream中加载source端的filter,满足条件的数据会流入到窗口中缓存,而我们在上面的datastream的过滤是在窗口内的过滤
eg: 一条23:54分的数据生成,那么它是满足where条件的,所以流入到窗口中缓存,当第二天的窗口触发后,这条数据仍然是在窗口中,累加也理所当然的会把这条数据计算在内
正确sql解析:
下面这段sql可以实现5分钟的间隔触发,不过个人更偏向于使用上面的datastream的滑窗实现,一是因为滑窗5分钟会将所有结果输出,而这种trigger只能触发在一定时间内到达的数据key的聚合输出 二是测试了一下,下面这种时间间隔好像不是按照 00:05 00:10.. . .. ..这种输出的
EnvironmentSettings build = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(build);
//trigger触发
Configuration tableConfig = tEnv.getConfig().getConfiguration();
tableConfig.setString("table.exec.emit.early-fire.enabled","true");
tableConfig.setString("table.exec.emit.early-fire.delay","5min");
//滚动窗口一天,每五分钟输出一次结果
tEnv.executeSql("insert into sliding_window_local_pay\n" +
"select DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss') as current_day,\n" +
" userid,TUMBLE_START(procTime, INTERVAL '1' DAY) as window_start,\n" +
"TUMBLE_END(procTime, INTERVAL '1' DAY) as window_end,COUNT (1) as pay_num\n" +
"from flink_kafka_payInfo group by TUMBLE(procTime, INTERVAL '1' DAY),userid");
边栏推荐
- [3. Basic search and first knowledge of graph theory]
- Use of postman
- 重学JSON.stringify
- 2022.7.16DAY606
- Use csrftester to automatically detect CSRF vulnerabilities
- 2022.7.13
- Eight queens n Queens
- Export and import in ES6
- [ciscn2019 finals Day2 web1]easyweb
- [4.6 detailed explanation of Chinese remainder theorem]
猜你喜欢

JSCORE day_ 03(7.4)

On the expression of thymeleaf
![[BJDCTF2020]EzPHP](/img/be/a48a1a9147f1f3b21ef2d60fc1f59f.png)
[BJDCTF2020]EzPHP

10 Web APIs
![[NCTF2019]SQLi](/img/a9/e103ccbbbb7dcf5ed20eb2bada528f.png)
[NCTF2019]SQLi

ArcGIS and CASS realize elevation points of cross-section Exhibition
![[interview: concurrent Article 16: multithreading: detailed explanation of wait/notify] principle and wrong usage (false wake-up, etc.)](/img/23/7af903e73e8990459f276b713beec9.png)
[interview: concurrent Article 16: multithreading: detailed explanation of wait/notify] principle and wrong usage (false wake-up, etc.)
Alibaba internal "shutter" core advanced notes~
![[NCTF2019]SQLi](/img/a9/e103ccbbbb7dcf5ed20eb2bada528f.png)
[NCTF2019]SQLi

Essay - I say you are so cute
随机推荐
[hongminggu CTF 2021] write_ shell
2022.7.18DAY608
5_ Linear regression
[WUSTCTF2020]CV Maker
当事务遇上分布式锁
【Codeforces Round #808 (Div 2.) A·B·C】
On the expression of thymeleaf
[leetcode] no duplicate longest string
3_ Jupiter notebook, numpy and mattlotlib
Search engine realizes keyword highlighting
Friend friend function and singleton mode
Use csrftester to automatically detect CSRF vulnerabilities
Medical data of more than 4000 people has been exposed for 16 years
[ciscn2019 southeast China division]double secret
Leetcode 301 week
[SQL注入] 报错注入
JSCORE day_ 03(7.4)
Dynamic binding, static binding, and polymorphism
Alibaba internal "shutter" core advanced notes~
MySQL第一篇