当前位置:网站首页>Talk about the state management mechanism in Flink framework
Talk about the state management mechanism in Flink framework
2022-07-03 11:56:00 【InfoQ】
Status overview
A stateful
No state
data:image/s3,"s3://crabby-images/82ef3/82ef3d94d046df55c52307822d055280b896d998" alt="null"
- Stateless stream processing receives each data record separately , Then the output data is generated according to the latest input data .( Only one input record is converted at a time , And only record the output according to the latest input )
- Stateful flow processing maintains state , The output record is generated based on the latest input record and the current status value .( Maintain status values for all processed records , And update the status according to each newly entered record , Therefore, the output records reflect the results after considering multiple events .)
data:image/s3,"s3://crabby-images/81788/817886962c61e9bd74025814ad0ab6091baee6d4" alt="null"
Operator state
Operator state cannot be accessed by another subtask of the same or different operator
data:image/s3,"s3://crabby-images/b5d36/b5d36bda0e1e5521d34d8dfa73f899cdb37cb144" alt="null"
- Represent a state as a list of data .
- State is also represented as a list of data . It differs from the regular list state in that , In case of failure , Or from the repository (savepoint) How to recover when starting an application .
- If an operator has multiple tasks , And each task has the same status , So this particular situation is best suited to broadcast state
public class StateTest1_OperatorState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Define a stateful Map operation Count the number of current partition data
SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMap());
resultStream.print();
env.execute();
}
// Customize mapFunction Registration status implementation ListCheckpointed
private static class MyCountMap implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer>{
// Define a local variable as the operator state
private Integer count = 0;
@Override
public Integer map(SensorReading sensorReading) throws Exception {
count++;
return count;
}
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(count);
}
@Override
public void restoreState(List<Integer> state) throws Exception {
for (Integer number : state) {
count+=number;
}
}
}
}
Keying state
data:image/s3,"s3://crabby-images/b72e4/b72e43f1f74800be343e992a4da26ae3f455b615" alt="null"
- Represent the state as a single value .
- Represent a state as a list of data
- Represent the status as a set of Key-Value Yes
- Represent the state as a list for aggregation operations
public class StateTest2_KeyedState {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Define a stateful Map operation Count the present senser The number of data
SingleOutputStreamOperator<Integer> resultStream = dataStream.keyBy("id")
.map(new MyKeyCouneMap());
resultStream.print();
env.execute();
}
// Customize RichMapFunction
private static class MyKeyCouneMap extends RichMapFunction<SensorReading,Integer> {
// Declare keying state
private ValueState<Integer> KeyCouneState ;
//private ListState<String> listState;
//private MapState<String,Double> mapState;
// private ReducingState<SensorReading> reduceState;
@Override
public void open(Configuration parameters) throws Exception {
KeyCouneState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>(
"key-count",Integer.class
));
// listState =getRuntimeContext().getListState(new ListStateDescriptor<String>(
// "list-count",String.class
));
//mapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Double>(
// "map-count",String.class,Double.class
));
//reduceState =getRuntimeContext().getReducingState(new ReducingStateDescriptor<SensorReading>(
// "reducing-count",SensorReading.class
//));class
}
@Override
public Integer map(SensorReading sensorReading) throws Exception {
// Read status
Integer count = KeyCouneState.value();
if (count==null){
count = 0;
}else {
count++;
}
// Update status , Assign a value to the state
KeyCouneState.update(count);
return count;
// listState
//Iterable<String> iterable = listState.get();
//for (String s : listState.get()) {
// System.out.println(s);
//}
//listState.add("hello");
// mapState
//mapState.get("1");
//mapState.put("2",12.3);
}
}
}
State backend
- Memory level state backend , The keying state is managed as an object in memory , Store them in TaskManager Of JVM Pile it up , And will be checkpoint Stored in JobManager The memory of the . characteristic : Fast 、 Low latency , But not stable .
- take checkpoint To a remote persistent file system (FileSystem) On , And for the local state , Follow MemoryStateBackend equally , There are also TaskManager Of JVM Memory level local access speed is also available on the heap , And better fault tolerance .
- After serializing all States , Deposited locally RocksDB Storage in
public class StateTest4_FaultTolerance {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Status backend configuration
env.setStateBackend(new MemoryStateBackend());
env.setStateBackend(new FsStateBackend(""));
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
env.execute();
}
}
data:image/s3,"s3://crabby-images/49df8/49df8728da2b3339791777a09fe360310c97dcf0" alt="null"
public class StateTest3_ApplicationCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Facility parallelism is 1
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// Define a flutmap operation , Detect temperature jump output alarm
SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream = dataStream.keyBy("id")
.flatMap(new TempChangeWarring(10.0));
resultStream.print();
env.execute();
}
private static class TempChangeWarring extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {
// Set the temperature jump threshold
private Double threshold;
public TempChangeWarring(Double threshold) {
this.threshold = threshold;
}
// Define the State , Save the last temperature value
private ValueState<Double> lastTempState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>(
"lastTemp", Double.class
));
}
@Override
public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {
// To obtain state
Double lastTemp = lastTempState.value();
// If not for null Just calculate the temperature difference twice
if (lastTemp != null) {
Double diff = Math.abs(value.getTemperature() - lastTemp);
if (diff>=threshold){
out.collect(new Tuple3<>(value.getId(),lastTemp,value.getTemperature()));
}
}
// Update status
lastTempState.update(value.getTemperature());
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}
边栏推荐
- typeScript
- Deploying WordPress instance tutorial under coreos
- Simple factory and factory method mode
- AI模型看看视频,就学会了玩《我的世界》:砍树、造箱子、制作石镐样样不差...
- 量化计算调研
- 【学习笔记】dp 状态与转移
- R语言ggplot2可视化:gganimate包创建动态折线图动画(gif)、使用transition_reveal函数在动画中沿给定维度逐步显示数据、在折线移动方向添加数据点
- vulnhub之narak
- Slam mapping and autonomous navigation simulation based on turnlebot3
- Vulnhub's Nagini
猜你喜欢
(数据库提权——Redis)Redis未授权访问漏洞总结
Solve msvcp120d DLL and msvcr120d DLL missing
Hongmeng third training (project training)
VS2015的下载地址和安装教程
银泰百货点燃城市“夜经济”
Wrong arrangement (lottery, email)
The world's most popular font editor FontCreator tool
After watching the video, AI model learned to play my world: cutting trees, making boxes, making stone picks, everything is good
vulnhub之tomato(西红柿)
Kubernetes three dozen probes and probe mode
随机推荐
vulnhub之presidential
mysql使用update联表更新的方法
利用Zabbix动态监控磁盘I/O
Mysql根据时间搜索常用方法整理
Capturing and sorting out external Fiddler -- Conversation bar and filter [2]
Excel quick cross table copy and paste
previous permutation lintcode51
安裝electron失敗的解决辦法
Optimize interface performance
How should intermediate software designers prepare for the soft test
This article explains the complex relationship between MCU, arm, MCU, DSP, FPGA and embedded system
Keepalived中Master和Backup角色选举策略
Yintai department store ignites the city's "night economy"
C language utf8toutf16 (UTF-8 characters are converted to hexadecimal encoding)
Excel表格转到Word中,表格不超边缘纸张范围
STL教程10-容器共性和使用场景
【mysql官方文档】死锁
Dynamically monitor disk i/o with ZABBIX
php 获取文件夹下面的文件列表和文件夹列表
2022年湖南工学院ACM集训第二次周测题解