当前位置:网站首页>Flink fault tolerance mechanism (V)
Flink fault tolerance mechanism (V)
2022-07-24 13:45:00 【Hua Weiyun】
Flink Fault tolerance mechanism ( 5、 ... and )
Today's goal
- Flink Of fault tolerance mechanism Checkpoint
- Flink Restart strategy of fault tolerance mechanism
- storage medium StateBackend
- Checkpoint Configuration mode
- Restart policy and recovery state
- Savepoint Manually restart and restore
- Parallelism settings
Flink State management
State is based on key perhaps operator operator The middle result of
Flink state Divided into two : Managed state - Hosting status , Raw state - Original state
Managed state It is divided into Two kinds of :
keyed state be based on key Upper state
Supported data structures valueState listState mapState broadcastState
operator state Operation based state
Byte array , ListState
Flink keyed state Case study
demand
Use KeyedState Medium ValueState Get the maximum value in the data ( Direct use in practice maxBy that will do ), Use value status to customize ,
<hello,1>
<hello,3>
<hello,2>
Input Tuple2<String/ word /, Long/ length /> Output Tuple3<String/ word /, Long/ length /, Long/ Historical maximum /> type
Development
package cn.itcast.flink.state;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * Author itcast * Date 2021/6/21 8:34 * Desc TODO */public class KeyedStateDemo { public static void main(String[] args) throws Exception { //1.env Set the concurrency to 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.Source See the courseware < City , frequency > => < City , Maximum number of times > DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements( Tuple2.of(" Beijing ", 1L), Tuple2.of(" Shanghai ", 2L), Tuple2.of(" Beijing ", 6L), Tuple2.of(" Shanghai ", 8L), Tuple2.of(" Beijing ", 3L), Tuple2.of(" Shanghai ", 4L) ); //3.Transformation // Use KeyState Medium ValueState Get the maximum value in the stream data ( Direct use in practice maxBy that will do ) // Realization way 1: Use it directly maxBy-- This method can be used in development SingleOutputStreamOperator<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0) //min Only the smallest field will be found , The other fields don't matter //minBy Will find the smallest field and the corresponding other fields //max Only the largest field will be found , The other fields don't matter //maxBy Will find the largest field and the corresponding other fields .maxBy(1); // Realization way 2: adopt managed state Input state //3.1. First, according to the string f0 Group and then map operation , take Tuple2<String/* City */, Long/* frequency */> Output Tuple3<String/* City */, Long/* frequency */, Long/* Historical maximum */> // SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS .keyBy(t->t.f0) .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String/* City */, Long/* frequency */, Long/* Historical maximum */>>() { ValueState<Long> maxState = null; //-1. Define the state of the value type to store the maximum value //3.2. rewrite RichMapFunction Of open Method @Override public void open(Configuration parameters) throws Exception { //-2. Define state descriptors //-3. Get the status value in memory from the current context ValueStateDescriptor maxStateDesc = new ValueStateDescriptor("maxState", Long.class); maxState = getRuntimeContext().getState(maxStateDesc); } //3.3. rewrite map Method //-4. obtain state Historical maximum in value And the maximum value of the current element @Override public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception { // In the memory state Maximum stored value of Long maxValue = maxState.value(); // The current value Long curValue = value.f1; if (maxValue == null || curValue > maxValue) { maxState.update(curValue); return Tuple3.of(value.f0, value.f1, curValue); } else { return Tuple3.of(value.f0, value.f1, maxValue); } } }); //-5. If the current value is large or the historical value is empty, the status is updated ; return Tuple3 Yuanzu results //4.Sink Printout //result1.print(); result2.print(); //5.execute execution environment env.execute(); }}
Flink operator state Case study
demand
Use ListState Storage offset Simulated consumption Kafka Of offset maintain
Realization
package cn.itcast.flink.state;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.runtime.state.FunctionInitializationContext;import org.apache.flink.runtime.state.FunctionSnapshotContext;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Iterator;/** * Author itcast * Date 2021/6/21 9:18 * Desc TODO */public class OperatorStateDemo { public static void main(String[] args) throws Exception { //1. Create a streaming environment , For easy observation, set the parallelism to 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2. Turn on checkpoint , And save the status to file:///D:/chk , First open checkpoint ,state management env.enableCheckpointing(1000); env.setStateBackend(new FsStateBackend("file:///D:/chk")); //3. Set up checkpoint Configuration of external chk, Only once semantics, etc env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //4. Turn on the restart policy 3 Seconds to try to restart 3 Time env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000)); //5. Add a data source, such as MyMonitorKafkaSource , Instantiate create MyMonitorKafkaSource DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource()); //6. Printout source.print(); //7. perform env.execute(); } // establish MyMonitorKafkaSource Inherit RichParallelSourceFunction<String> And implement CheckpointedFunction public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{ // rewrite initializeState Method ListStateDescriptor State description and pass context obtain offsetState ListState<Long> offsetState = null; boolean flag = true; Long offset = 0L; @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class); offsetState = context.getOperatorStateStore().getListState(offsetStateDesc); } // rewrite run Method Read out offset and Cyclic reading offset+=1, Get the core number of the implementation , Output ( Nuclear number and offset), One per second , Every time 5 Bar simulates an exception @Override public void run(SourceContext<String> ctx) throws Exception { Iterator<Long> iterator = offsetState.get().iterator(); if(iterator.hasNext()){ offset = iterator.next(); } while(flag){ offset = offset + 1; // Handle CPU The core Index int idx = getRuntimeContext().getIndexOfThisSubtask(); System.out.println("index:"+idx+" offset:"+offset); Thread.sleep(1000); if(offset % 5 ==0){ System.out.println(" There is an error in the current program ...."); throw new Exception(" Program out BUG..."); } } } // rewrite cancel Method @Override public void cancel() { flag = false; } // rewrite snapshotState Method , Empty offsetState , And will be the latest offset Add in @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetState.clear(); offsetState.add(offset); } }}
Flink Fault tolerance mechanism of
- checkpoint : At some point ,Flink All of the Operator The current State Global snapshot of , Usually on disk .
checkpoint The implementation process of
- Trigger checkpoint , JobManager Master node
- JobManager Trigger barrier The signal , to source -> transformation -> sink , Will trigger , Set current operator operator state Save to HDFS Or on a local file , Every operator It's all backed up , The current one checkpoint It's done .
storage medium
- memoryStatebackend Production environment does not recommend
- FsStatebackend It's stored in HDFS Or on the local file system , Can be used in production environments .
- RocksdbStatebackend Store locally first , Asynchronous incremental storage to HDFS On the file system , Generally support large middle state scene
Checkpoint Configuration mode
Global profile flink-conf.yaml
state.backend: filesystem# Directory for checkpoints filesystem, when using any of the default bundled# state backends.#state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints# Default target directory for savepoints, optional.#state.savepoints.dir: hdfs://namenode-host:port/flink-savepointsSet through code
env.enableCheckpointing(1000);env.setStateBackend(new FsStateBackend("file:///D:/chk"));demand
package cn.itcast.flink.state;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.runtime.state.FunctionInitializationContext;import org.apache.flink.runtime.state.FunctionSnapshotContext;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Iterator;/** * Author itcast * Date 2021/6/21 9:18 * Desc TODO */public class OperatorStateDemo { public static void main(String[] args) throws Exception { //1. Create a streaming environment , For easy observation, set the parallelism to 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2. Turn on checkpoint , And save the status to file:///D:/chk , First open checkpoint ,state management env.enableCheckpointing(1000); // Set up state backend FsStateBackend : file system // RocksdbStateBackend : rocksdb plug-in unit Asynchronous incremental refresh to HDFS File system env.setStateBackend(new FsStateBackend("file:///D:/chk")); // Set up checkpoint, If the current task is cancelled , Identify the current checkpoint Whether or not to delete env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig .ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Set current checkpoint Timeout for env.getCheckpointConfig().setCheckpointTimeout(60000); // Two checkpoint The shortest interval between env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Sets the current parallel execution checkpoint The number of env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //env.setStateBackend(new FsStateBackend("hdfs://node1:8020/checkpoints")); //env.setStateBackend(new RocksdbStateBackend()) //3. Set up checkpoint Configuration of external chk, Only once semantics, etc env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //4. Turn on the restart policy 3 Seconds to try to restart 3 Time env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000)); //5. Add a data source, such as MyMonitorKafkaSource , Instantiate create MyMonitorKafkaSource DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource()); //6. Printout source.print(); //7. perform env.execute(); } // establish MyMonitorKafkaSource Inherit RichParallelSourceFunction<String> And implement CheckpointedFunction public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{ // rewrite initializeState Method ListStateDescriptor State description and pass context obtain offsetState ListState<Long> offsetState = null; boolean flag = true; Long offset = 0L; @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class); offsetState = context.getOperatorStateStore().getListState(offsetStateDesc); } // rewrite run Method Read out offset and Cyclic reading offset+=1, Get the core number of the implementation , Output ( Nuclear number and offset), One per second , Every time 5 Bar simulates an exception @Override public void run(SourceContext<String> ctx) throws Exception { Iterator<Long> iterator = offsetState.get().iterator(); if(iterator.hasNext()){ offset = iterator.next(); } while(flag){ offset = offset + 1; // Handle CPU The core Index int idx = getRuntimeContext().getIndexOfThisSubtask(); System.out.println("index:"+idx+" offset:"+offset); Thread.sleep(1000); if(offset % 5 ==0){ System.out.println(" There is an error in the current program ...."); throw new Exception(" Program out BUG..."); } } } // rewrite cancel Method @Override public void cancel() { flag = false; } // rewrite snapshotState Method , Empty offsetState , And will be the latest offset Add in @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetState.clear(); offsetState.add(offset); } }}
Restart policy and recovery state
back : from checkpoint Backup HDFS Put the data state Restore to the current program
Restart strategy : An exception occurred in the current program , Different time 、 Restart the program by the number of restarts
Classification of restart policies :
- No restart noStrategy
- Always restart ( Default )
- Fixed delay restart strategy , restart 3 Time , Delay time between each time 5s
- Failure rate restart strategy , 5 Within minutes , restart 5 Time , Every time 1 minute
demand
from socket Reading data , Use checkpoint And restart mechanism , wordcount Statistics , simulation Input bug.
package cn.itcast.flink.state;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * Author itcast * Date 2021/6/21 11:18 * Desc TODO */public class CheckpointRestartDemo { public static void main(String[] args) throws Exception { //1.env execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //===========Checkpoint Parameter setting ==== // Set up Checkpoint The time interval of is 1000ms Do it once Checkpoint env.enableCheckpointing(1000); // Set up State Status storage media file:///d:/chk env.setStateBackend(new FsStateBackend("file:///d:/chk")); // Set twice checkpoint The shortest interval between env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Set if you are doing Checkpoint There was a mistake in the process , Let the whole mission fail :true yes false No // When the job is cancelled , Retain checkpoint Don't delete env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig .ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Set up checkpointMode by Exactly_Once semantics env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // Set up checkpoint Timeout for , If Checkpoint stay 60s It has not been completed within this time Checkpoint Failure , Then discard . env.getCheckpointConfig().setCheckpointTimeout(60000); // Set the number of... At the same time checkpoint It can be executed at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //============= Restart strategy =========== //-1. The default policy : Configured with Checkpoint The restart policy is not configured, and unlimited restart is used by default //-2. Configure no restart policy // env.setRestartStrategy(RestartStrategies.noRestart()); //-3. Fixed delay restart strategy // restart 3 Time , Each interval 10s env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000)); //-4. Failure rate restart -- Use occasionally //5 Restart in minutes 3 Time ( The first 3 This time does not include , That is, restart at most 2 Time ), Each interval 10s env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10))); //2.Source establish socket data source DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999); //3.Transformation //3.1 Cut out each word from the space and flatMap convert to Tuple2 SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String word : words) { //3.2 Traverse the loop if you encounter bug The word throws an exception, and the simulator reports an error if (word.equals("bug")) { System.out.println(" The current program reports an error ..."); throw new Exception(" Program BUG ..."); } out.collect(Tuple2.of(word, 1)); } } }) //3.3 grouping // Be careful : The grouping of batch processing is groupBy, The grouping of stream processing is keyBy .keyBy(t -> t.f0) //3.4 polymerization .sum(1); //4.sink Printout result.print(); //5.execute env.execute(); }}
Savepoint Manually restart and restore
Manual backup and recovery
flink savepoint jobid Storage pathfrom savepoint Restore data
flink run -s Storage path --class Full path class name --jar jar package
Parallelism settings
- Four parallel settings
- Operator level
- Global parallelism
- client ( Black window ) Parallelism
- Profile Settings
- priority
- Operator level => 2. Global parallelism settings => 3. Set the parallelism of the client => 4. Profile parallelism
边栏推荐
- Network security -- man in the middle attack penetration test
- 基于群体熵的机器人群体智能汇聚度量
- The R language uses the sort function to sort vector data and return the actually sorted data (ascending by default)
- Flex layout
- Repair the problem of adding device groups and editing exceptions on easycvr platform
- 网络安全——中间人攻击渗透测试
- Wildcard (Pan domain name) SSL certificate
- Odoo+ test
- R language uses the sum function of epidisplay package to calculate the descriptive statistical summary information of the specified variables in dataframe under different grouping variables, visualiz
- 网络安全——文件上传竞争条件绕过
猜你喜欢

第六章 总线

Sringboot-plugin-framework 实现可插拔插件服务

ICML2022 | 分支强化学习

Network security - Cookie injection

WSDM 22 | 基于双曲几何的图推荐

网络安全——Web信息收集

Exploration of sustainable learning ability to support the application of ecological evolution of Pengcheng series open source large models

网络安全——Web渗透测试

网络安全——WAR后门部署

如何生成预期数据?埃默里大学等最新《深度学习可控数据生成》综述,52页pdf涵盖346篇文献全面阐述可控生成技术体系
随机推荐
如何在Ubuntu 18.04和Debian 9上安装PHP 5.6
NOIP2021 T2 数列
Ansible服务常用命令模块详细解析
Rhcsa sixth note
CSP2021 T1 廊桥分配
uni-app 背景音频 熄屏或者退回桌面之后不在播放
Flink综合案例(九)
Data formatting widget
Network security - file upload competitive conditions bypass
网络安全——使用Exchange SSRF 漏洞结合NTLM中继进行渗透测试
Dtcloud uses custom fonts
Realize a JS lottery?
R语言使用epiDisplay包的dotplot函数通过点图的形式可视化不同区间数据点的频率、使用by参数指定分组参数可视化不同分组的点图分布、使用cex.Y.axis参数指定Y轴分组标签文本的大小
Statistical table of competition time and host school information of 2022 national vocational college skills competition (the second batch)
Flink高级特性和新特性(八)
Network security - file upload blacklist bypass
An error is reported when using activiti to create a database table,
Group knowledge map: distributed knowledge transfer and federated map reasoning
基于社会媒体数据增强的交通态势感知研究及进展
Is it safe for Huatai Securities to open an account through channels? Is it formal