当前位置:网站首页>Flink 滑动窗口理解&具体业务场景介绍
Flink 滑动窗口理解&具体业务场景介绍
2022-07-26 22:40:00 【me凡】
滑动窗口是计算 过去一段时间到当前窗口内的数据,比如窗口size为1 day,步长为5 minutes,那么每五分钟的窗口就会去计算窗口结束时间点前一天的数据
最初我的理解是计算当前到未来的一段时间(大错特错的想法)
1. 窗口划分与数据清理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> localSource = env.socketTextStream("localhost", 8888);
localSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
@Override
public Tuple3<String, Integer, String> map(String str) throws Exception {
String[] split = str.split(",");
return Tuple3.of(split[0], 1, split[1]);
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new WyTimestampAssigner()))
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5), Time.minutes(1))
.sum(1).print();
env.execute("slding window T");demo就是上面这样,数据格式为: cup,2020-12-22 11:00:00.000
eg: 窗口size=5m,slide=1m

输入时间戳 | window_start | window_end | clean_time |
2020-12-22 11:00:00 | 2020/12/22 11:0:0 | 2020/12/22 11:5:0 | 2020/12/22 11:05 |
........total 5 | |||
2020/12/22 10:56:0 | 2020/12/22 11:1:0 |
当窗口大小为5分钟,步长为1分钟时,2020-12-22 11:00:00 这条数据在5个窗口内,该数据被清理的时间为2020/12/22 11:05
eg:窗口size=1day slide=5
输入 | window_start | window_end | clean_time |
2020-12-22 11:00:00 | 2020/12/22 11:00 | 2020/12/23 11:00 | 2020/12/23 11:00 |
.........total 288 | |||
2020/12/21 11:05 | 2020/12/22 11:05 |
当窗口大小为1天,步长为5分钟时,2020-12-22 11:00:00 这条数据在288个窗口内,该数据被清理的时间为2020/12/23 11:00
源码理解可参考 https://www.jianshu.com/p/45b03390b258
具体业务场景介绍(这里介绍dataStream和sql两种实现)
计算类似GTV的指标,要求每五分钟输出一次从凌晨到当前时间的统计值
datastream实现:
这里不贴完整代码,从keyBy开始处理,设置1天的滑动窗口,步长为5,在process中使用if判断数据是不是今天的来进行累加,这样过了00:00后,昨天的数据不会被统计,也就实现了业务要求的5分钟输出一次从凌晨到当前时间的统计值
.keyBy(t -> t.getShop_name())
.timeWindow(Time.days(1),Time.minutes(5))
.process(new ProcessWindowFunction<GoodDetails, Tuple3<String, String, Integer>, String, TimeWindow>() {
SimpleDateFormat sdf_million = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
SimpleDateFormat sdf_day = new SimpleDateFormat("yyyy-MM-dd");
@Override
public void process(String s, Context ctx, Iterable<GoodDetails> elements, Collector<Tuple3<String, String, Integer>> out) throws Exception {
Calendar cld = Calendar.getInstance();
Iterator<GoodDetails> iterator = elements.iterator();
String curentDay = sdf_day.format(cld.getTimeInMillis() - 180000); //这里减3分钟是因为,当凌晨 23:55-00:00窗口触发的时候,后面的if判断会不准确,程序走到这里都超过00:00了,减3分钟或者合适的单位都可以
//计数
int countNum = 0;
while (iterator.hasNext()){
GoodDetails next = iterator.next();
String elementData = next.getRegion_name().substring(0,10);
if(elementData.equals(curentDay)){
countNum+=next.getGood_price();
}
}
long end = ctx.window().getEnd();
String windowEnd = sdf_million.format(end);
out.collect(Tuple3.of(windowEnd,s,countNum));
}
})Flinksql实现:flinksql貌似不支持这种窗口内的过滤处理,如果小伙伴有sql的解决方案,还麻烦分享到评论区
错误sql解析:
SELECT
userid,
FROM_UNIXTIME(UNIX_TIMESTAMP(HOP_END(procTime, INTERVAL '5' minute, INTERVAL '1' day))+28800) as currentTime,
HOP_START (procTime, INTERVAL '5' minute, INTERVAL '1' day) as window_start,
HOP_END (procTime, INTERVAL '5' minute, INTERVAL '1' day) as window_end,
COUNT (click) as click_num
FROM
flink_kafka_join_click_zero_watermark
where DATE_FORMAT(ctime,'yyyy-MM-dd') = DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd')
GROUP BY
HOP (procTime, INTERVAL '5' minute, INTERVAL '1' day),userid;最初实现需求的时候写了这个sql,觉得可以实现5分钟输出一次凌晨到目前的统计结果,结果No,在结果表里会发现:凌晨以后数据的累加依然是在昨天的基础上进行累加,原因在于:这个where条件相当于datastream中加载source端的filter,满足条件的数据会流入到窗口中缓存,而我们在上面的datastream的过滤是在窗口内的过滤
eg: 一条23:54分的数据生成,那么它是满足where条件的,所以流入到窗口中缓存,当第二天的窗口触发后,这条数据仍然是在窗口中,累加也理所当然的会把这条数据计算在内
正确sql解析:
下面这段sql可以实现5分钟的间隔触发,不过个人更偏向于使用上面的datastream的滑窗实现,一是因为滑窗5分钟会将所有结果输出,而这种trigger只能触发在一定时间内到达的数据key的聚合输出 二是测试了一下,下面这种时间间隔好像不是按照 00:05 00:10.. . .. ..这种输出的
EnvironmentSettings build = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(build);
//trigger触发
Configuration tableConfig = tEnv.getConfig().getConfiguration();
tableConfig.setString("table.exec.emit.early-fire.enabled","true");
tableConfig.setString("table.exec.emit.early-fire.delay","5min");
//滚动窗口一天,每五分钟输出一次结果
tEnv.executeSql("insert into sliding_window_local_pay\n" +
"select DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss') as current_day,\n" +
" userid,TUMBLE_START(procTime, INTERVAL '1' DAY) as window_start,\n" +
"TUMBLE_END(procTime, INTERVAL '1' DAY) as window_end,COUNT (1) as pay_num\n" +
"from flink_kafka_payInfo group by TUMBLE(procTime, INTERVAL '1' DAY),userid");
边栏推荐
猜你喜欢

Detailed explanation of CSRF forged user request attack
![[4.9 detailed explanation of inclusion exclusion principle]](/img/c9/673507abab48a1593486c2901adac9.png)
[4.9 detailed explanation of inclusion exclusion principle]

JSCORE day_ 04(7.5)
![[CTF攻防世界] WEB区 关于Cookie的题目](/img/96/6e91ee19343a1ddc49dc2bc94cba62.png)
[CTF攻防世界] WEB区 关于Cookie的题目

Inherit, inherit, inherit

JSCORE day_03(7.4)
![[问题]yum资源被占用怎么办](/img/8d/50129fa1b1ef0aa0e968e6e6f20969.png)
[问题]yum资源被占用怎么办

Golang切片make与new的区别
![[CISCN2019 总决赛 Day2 Web1]Easyweb](/img/36/1ca4b6cae4e0dda0916b511d4bcd9f.png)
[CISCN2019 总决赛 Day2 Web1]Easyweb

Parallel MPI program delivery send message
随机推荐
JSCORE day_01(6.30) RegExp 、 Function
MySql - 如何确定一个字段适合构建索引?
Flink 1.15本地集群部署Standalone模式(独立集群模式)
[watevrCTF-2019]Cookie Store
2022.7.10DAY602
[By Pass] WAF 的绕过方式
深入理解Golang - 闭包
2020-12-22 maximum common factor
Elaborate on the differences and usage of call, apply and bind 20211031
[CTF 真题] 2018-网鼎杯-Web-Unfinish
[WUSTCTF2020]CV Maker
Yolo of Darknet_ Forward of layer_ yolo_ Layer comments
[HFCTF2020]EasyLogin
当事务遇上分布式锁
Two or three things about redis
Vector size performance problems
[4.9 detailed explanation of inclusion exclusion principle]
Medical data of more than 4000 people has been exposed for 16 years
05 - 钓鱼网站的攻击与防御
C language shutdown applet