当前位置:网站首页>flink通过ProcessFunction和定时器onTimer实现一个窗口累加的功能
flink通过ProcessFunction和定时器onTimer实现一个窗口累加的功能
2022-07-22 22:40:00 【AokCap】
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
/** * @program: flink-demo * @description: 通过onTimer实现一个累加功能,一分钟一个窗口累计,然后下一个窗口累加上一个窗口的值 * @author: ZhangYitian * @create: 2022-01-08 09:14 */
public class ProcessingTimeTimerDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> line = env.socketTextStream("doitedu03", 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndCount = line.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
String[] fields = value.split(",");
return Tuple2.of(fields[0], Integer.parseInt(fields[1]));
}
});
KeyedStream<Tuple2<String, Integer>, String> keyed = wordAndCount.keyBy(t -> t.f0);
keyed.process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private transient ValueState<Integer> counter;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> valueStateDescriptor = new ValueStateDescriptor<>("wc-state", Integer.class);
counter = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
//获取当前的ProcessingTime
long currentProcessingTime = ctx.timerService().currentProcessingTime();
//将当前的ProcessingTime的下一分钟时间注册一个定时器
long fireTime = currentProcessingTime - currentProcessingTime % 60000 + 60000;
//如果注册相同数据的timeTimer,后面的会将前面的覆盖,即相同的timeTimer只会被触发一次
ctx.timerService().registerProcessingTimeTimer(fireTime);
Integer currentCount = value.f1;
Integer historyCount = counter.value();
if (historyCount == null){
historyCount = 0;
}
int totalCount = historyCount + currentCount;
//更新状态
counter.update(totalCount);
}
//当闹钟到了指定的时间,就会执行onTimer方法
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
//获取value
Integer currentValue = counter.value();
//获取当前的key
String currentKey = ctx.getCurrentKey();
//如果想实现类似滚动窗口,不累加历史数据,只是累加当前窗口的数据,就清空状态
//counter.update(0);
//输出key和value
out.collect(Tuple2.of(currentKey,currentValue));
}
}).print();
env.execute();
}
}
边栏推荐
猜你喜欢

Typora设置标题自动添加序号

Live broadcast preview | live broadcast Seminar on open source security governance models and tools

php可不可以拆分数组

气不过“自愿降薪”,裸辞面字节,四天三面,结局居然这样?

园区/厂区怎么实现wifi上网短信认证

Celebrity interview | various strange current situations in the open source community -- night sky Book Chen Zili tison

U盘被格式化数据能恢复吗,U盘被格式化了怎样恢复

深度解析kube-scheduler调度上下文

Redis事务与锁机制

论文阅读:The Perfect Match: 3D Point Cloud Matching with Smoothed Densities
随机推荐
Experiment 5 JPEG
php数组下标是不是只能从0开始
Jedis operation redis
PIP update a package
Reading notes - > statistics] construction of 12-02 confidence interval -t distribution concept introduction
Rust -- detailed explanation of option
Celebrity interview | various strange current situations in the open source community -- night sky Book Chen Zili tison
【读书笔记->统计学】12-01 置信区间的构建-置信区间概念简介
Networkx visualizes graphs
张宇高数30讲总结
轻松带你走进turtle绘图的大门
Can the formatted data of the USB flash disk be recovered? How to recover the formatted data of the USB flash disk
Can PHP split arrays
【JS 逆向百例】某公共资源交易网,公告 URL 参数逆向分析
TensorRT的插件实战(1)
气不过“自愿降薪”,裸辞面字节,四天三面,结局居然这样?
Leetcode day 26
Networkx visualizes graphs
Organizational structure of agile testing team
Learn these SketchUp skills and improve work efficiency by half