当前位置:网站首页>Flick enables mapstate to implement keyedstate
Flick enables mapstate to implement keyedstate
2022-07-23 08:19:00 【AokCap】
Test data
Liaoning Province , Shenyang ,1000
Liaoning Province , Dalian ,2000
Liaoning Province , Shenyang ,1500
Hunan province , Changsha City ,1200
Hunan province , Changsha City ,1000
Hunan province , Changde City ,4000
Hunan province , Changde City ,3000
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.concurrent.TimeUnit;
/** * @Author: Zhang * @Description: * @Date: Created in 19:23 2021/12/25 * @Modified By: * */
public class MapStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Turn on checkpoint, Every time 10s Do it once. checkpoint, After it is turned on, the infinite restart strategy is used by default
env.enableCheckpointing(10000);
// Startup failure rate restart strategy
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(30, TimeUnit.SECONDS), //time interval for measuring failure rate
Time.of(3, TimeUnit.SECONDS) // delay
));
DataStreamSource<String> lines = env.socketTextStream("doitedu03", 8888);
SingleOutputStreamOperator<Tuple3<String,String,Integer>> tpStream = lines.flatMap(new FlatMapFunction<String, Tuple3<String,String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple3<String,String,Integer>> collector) throws Exception {
String[] sp = value.split(",");
if (sp[0].equals("error")){
throw new RuntimeException(" Something went wrong !!!");
}
collector.collect(Tuple3.of(sp[0],sp[1],Integer.parseInt(sp[2])));
}
});
KeyedStream<Tuple3<String, String, Integer>, String> keyed = tpStream.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple3<String, String, Integer>> process = keyed.process(new KeyedProcessFunction<String, Tuple3<String, String, Integer>, Tuple3<String, String, Integer>>() {
private transient MapState<String, Integer> mapState;
@Override
public void open(Configuration parameters) throws Exception {
// First define a state descriptor
MapStateDescriptor<String, Integer> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, Integer.class);
// Initialize or restore the historical state
mapState = getRuntimeContext().getMapState(mapStateDescriptor);
}
@Override
public void processElement(Tuple3<String, String, Integer> input, Context context, Collector<Tuple3<String, String, Integer>> collector) throws Exception {
String city = input.f1;
Integer money = input.f2;
Integer historyMoney = mapState.get(city);
if (historyMoney == null) {
historyMoney = 0;
}
Integer totalMoney = money + historyMoney;
// Update to state in
mapState.put(city, totalMoney);
// Output
input.f2 = totalMoney;
collector.collect(input);
}
});
process.print();
env.execute();
}
}
边栏推荐
猜你喜欢
随机推荐
阿里云国际版忘记会员名或登录密码,怎么办?
Restclient operation index library - initialize restclient
大咖访谈 | 开源社区里各种奇怪的现状——夜天之书陈梓立tison
直播预告 | 开源安全治理模型和工具直播研讨会
1.11 ArrayList & student management system
Tensorrt plug-in practice (1)
How to use selenium.chrome to realize the extended function of intercepting or forwarding requests
MSG | 开源与结缘,昇思携梦前行!
改变this指向了解一下
Binary tree (learning daily)
Genesis公链:夯实Web 3.0发展底座
flink使MapState实现KeyedState
获取一个控件宽度
Comment synchroniser
U盘被格式化数据能恢复吗,U盘被格式化了怎样恢复
QT 线程退出的几种方式
动作捕捉在自动化控制领域的应用
Jedis操作Redis
阿里云国际版账户收到账号风险通知,怎么办?
押注全场景,荣耀胜算几何?









