当前位置:网站首页>flink使MapState实现KeyedState
flink使MapState实现KeyedState
2022-07-22 22:40:00 【AokCap】
测试数据
辽宁省,沈阳市,1000
辽宁省,大连市,2000
辽宁省,沈阳市,1500
湖南省,长沙市,1200
湖南省,长沙市,1000
湖南省,常德市,4000
湖南省,常德市,3000
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
/** * @Author: Zhang * @Description: * @Date: Created in 19:23 2021/12/25 * @Modified By: * */
public class MapStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpoint, 每10s进行一次checkpoint, 开启后默认使用无限重启策略
env.enableCheckpointing(10000);
//开启失败率重启策略
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(30, TimeUnit.SECONDS), //time interval for measuring failure rate
Time.of(3, TimeUnit.SECONDS) // delay
));
DataStreamSource<String> lines = env.socketTextStream("doitedu03", 8888);
SingleOutputStreamOperator<Tuple3<String,String,Integer>> tpStream = lines.flatMap(new FlatMapFunction<String, Tuple3<String,String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String,String,Integer>> collector) throws Exception {
String[] sp = value.split(",");
if (sp[0].equals("error")){
throw new RuntimeException("出现错误了!!!");
}
collector.collect(Tuple3.of(sp[0],sp[1],Integer.parseInt(sp[2])));
}
});
KeyedStream<Tuple3<String, String, Integer>, String> keyed = tpStream.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple3<String, String, Integer>> process = keyed.process(new KeyedProcessFunction<String, Tuple3<String, String, Integer>, Tuple3<String, String, Integer>>() {
private transient MapState<String, Integer> mapState;
@Override
public void open(Configuration parameters) throws Exception {
//先定义一个状态描述器
MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, Integer.class);
//初始化或者恢复历史状态
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public void processElement(Tuple3<String, String, Integer> input, Context context, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
String city = input.f1;
Integer money = input.f2;
Integer historyMoney = mapState.get(city);
if (historyMoney == null) {
historyMoney = 0;
}
Integer totalMoney = money + historyMoney;
//更新到state中
mapState.put(city, totalMoney);
//输出
input.f2 = totalMoney;
collector.collect(input);
}
});
process.print();
env.execute();
}
}
边栏推荐
- Arduino中断实现上升沿检测,并执行其他函数
- Solution to the second game of 2022 Hangzhou Electric Multi school league
- 大咖访谈 | 开源社区里各种奇怪的现状——夜天之书陈梓立tison
- 开发者分享|『啃书吧:深度学习与MindSpore实践』第一章
- 树以及二叉树的常用性质以及遍历
- EMQX v4.4.5 发布:新增排他订阅及 MQTT 5.0 发布属性支持
- Brief analysis of several key technical points of traditional bank bill printing system
- C语言中的字符串
- 互联网流量编排方案
- U盘被格式化数据能恢复吗,U盘被格式化了怎样恢复
猜你喜欢

Restclient operation index library - initialize restclient

appendToFile追加失败

Live broadcast preview | live broadcast Seminar on open source security governance models and tools

C language function (1)
![Reading notes - > statistics] construction of 12-02 confidence interval -t distribution concept introduction](/img/4d/25b4d3d6af0fb30c222613d3c428b7.png)
Reading notes - > statistics] construction of 12-02 confidence interval -t distribution concept introduction

轻松带你走进turtle绘图的大门

RPC-BDY(5)-服务自动注销、负载均衡

21 -- product of arrays other than itself

开发者分享|MindSpore Lite 体验,一键实现图像分割

关于常见排序的稳定性
随机推荐
buu web
挖财和启牛都是干什么的开户安全吗?
EMQX v4.4.5 发布:新增排他订阅及 MQTT 5.0 发布属性支持
js 正则删除span标签以及标签里面的内容
The boss asked me to do an IP territorial function and an open source library!
U盘被格式化数据能恢复吗,U盘被格式化了怎样恢复
Redis 事务学习有感
押注全场景,荣耀胜算几何?
如何高效安装MindSpore的GPU版本
Can the formatted data of the USB flash disk be recovered? How to recover the formatted data of the USB flash disk
Data types in redis
敏捷测试团队组织构成
来,滑动到下一个小姐姐
promise(二)
PIP update a package
C language minesweeping
The author believes that the development logic of the meta universe and the Internet is quite different in Chengdu
I can't be angry with "voluntary salary reduction". I'm naked. I'm three times in four days. How can it end like this?
Has the live broadcast function of the multi merchant system been used? 666 for used friends!
networkx 对图进行可视化