当前位置:网站首页>Flink fault tolerance mechanism (V)
Flink fault tolerance mechanism (V)
2022-07-24 13:45:00 【Hua Weiyun】
Flink Fault tolerance mechanism ( 5、 ... and )
Today's goal
- Flink Of fault tolerance mechanism Checkpoint
- Flink Restart strategy of fault tolerance mechanism
- storage medium StateBackend
- Checkpoint Configuration mode
- Restart policy and recovery state
- Savepoint Manually restart and restore
- Parallelism settings
Flink State management
State is based on key perhaps operator operator The middle result of
Flink state Divided into two : Managed state - Hosting status , Raw state - Original state
Managed state It is divided into Two kinds of :
keyed state be based on key Upper state
Supported data structures valueState listState mapState broadcastState
operator state Operation based state
Byte array , ListState
Flink keyed state Case study
demand
Use KeyedState Medium ValueState Get the maximum value in the data ( Direct use in practice maxBy that will do ), Use value status to customize ,
<hello,1>
<hello,3>
<hello,2>
Input Tuple2<String/ word /, Long/ length /> Output Tuple3<String/ word /, Long/ length /, Long/ Historical maximum /> type
Development
package cn.itcast.flink.state;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * Author itcast * Date 2021/6/21 8:34 * Desc TODO */public class KeyedStateDemo { public static void main(String[] args) throws Exception { //1.env Set the concurrency to 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.Source See the courseware < City , frequency > => < City , Maximum number of times > DataStreamSource<Tuple2<String, Long>> tupleDS = env.fromElements( Tuple2.of(" Beijing ", 1L), Tuple2.of(" Shanghai ", 2L), Tuple2.of(" Beijing ", 6L), Tuple2.of(" Shanghai ", 8L), Tuple2.of(" Beijing ", 3L), Tuple2.of(" Shanghai ", 4L) ); //3.Transformation // Use KeyState Medium ValueState Get the maximum value in the stream data ( Direct use in practice maxBy that will do ) // Realization way 1: Use it directly maxBy-- This method can be used in development SingleOutputStreamOperator<Tuple2<String, Long>> result1 = tupleDS.keyBy(t -> t.f0) //min Only the smallest field will be found , The other fields don't matter //minBy Will find the smallest field and the corresponding other fields //max Only the largest field will be found , The other fields don't matter //maxBy Will find the largest field and the corresponding other fields .maxBy(1); // Realization way 2: adopt managed state Input state //3.1. First, according to the string f0 Group and then map operation , take Tuple2<String/* City */, Long/* frequency */> Output Tuple3<String/* City */, Long/* frequency */, Long/* Historical maximum */> // SingleOutputStreamOperator<Tuple3<String, Long, Long>> result2 = tupleDS .keyBy(t->t.f0) .map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String/* City */, Long/* frequency */, Long/* Historical maximum */>>() { ValueState<Long> maxState = null; //-1. Define the state of the value type to store the maximum value //3.2. rewrite RichMapFunction Of open Method @Override public void open(Configuration parameters) throws Exception { //-2. Define state descriptors //-3. Get the status value in memory from the current context ValueStateDescriptor maxStateDesc = new ValueStateDescriptor("maxState", Long.class); maxState = getRuntimeContext().getState(maxStateDesc); } //3.3. rewrite map Method //-4. obtain state Historical maximum in value And the maximum value of the current element @Override public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception { // In the memory state Maximum stored value of Long maxValue = maxState.value(); // The current value Long curValue = value.f1; if (maxValue == null || curValue > maxValue) { maxState.update(curValue); return Tuple3.of(value.f0, value.f1, curValue); } else { return Tuple3.of(value.f0, value.f1, maxValue); } } }); //-5. If the current value is large or the historical value is empty, the status is updated ; return Tuple3 Yuanzu results //4.Sink Printout //result1.print(); result2.print(); //5.execute execution environment env.execute(); }}
Flink operator state Case study
demand
Use ListState Storage offset Simulated consumption Kafka Of offset maintain
Realization
package cn.itcast.flink.state;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.runtime.state.FunctionInitializationContext;import org.apache.flink.runtime.state.FunctionSnapshotContext;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Iterator;/** * Author itcast * Date 2021/6/21 9:18 * Desc TODO */public class OperatorStateDemo { public static void main(String[] args) throws Exception { //1. Create a streaming environment , For easy observation, set the parallelism to 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2. Turn on checkpoint , And save the status to file:///D:/chk , First open checkpoint ,state management env.enableCheckpointing(1000); env.setStateBackend(new FsStateBackend("file:///D:/chk")); //3. Set up checkpoint Configuration of external chk, Only once semantics, etc env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //4. Turn on the restart policy 3 Seconds to try to restart 3 Time env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000)); //5. Add a data source, such as MyMonitorKafkaSource , Instantiate create MyMonitorKafkaSource DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource()); //6. Printout source.print(); //7. perform env.execute(); } // establish MyMonitorKafkaSource Inherit RichParallelSourceFunction<String> And implement CheckpointedFunction public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{ // rewrite initializeState Method ListStateDescriptor State description and pass context obtain offsetState ListState<Long> offsetState = null; boolean flag = true; Long offset = 0L; @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class); offsetState = context.getOperatorStateStore().getListState(offsetStateDesc); } // rewrite run Method Read out offset and Cyclic reading offset+=1, Get the core number of the implementation , Output ( Nuclear number and offset), One per second , Every time 5 Bar simulates an exception @Override public void run(SourceContext<String> ctx) throws Exception { Iterator<Long> iterator = offsetState.get().iterator(); if(iterator.hasNext()){ offset = iterator.next(); } while(flag){ offset = offset + 1; // Handle CPU The core Index int idx = getRuntimeContext().getIndexOfThisSubtask(); System.out.println("index:"+idx+" offset:"+offset); Thread.sleep(1000); if(offset % 5 ==0){ System.out.println(" There is an error in the current program ...."); throw new Exception(" Program out BUG..."); } } } // rewrite cancel Method @Override public void cancel() { flag = false; } // rewrite snapshotState Method , Empty offsetState , And will be the latest offset Add in @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetState.clear(); offsetState.add(offset); } }}
Flink Fault tolerance mechanism of
- checkpoint : At some point ,Flink All of the Operator The current State Global snapshot of , Usually on disk .
checkpoint The implementation process of
- Trigger checkpoint , JobManager Master node
- JobManager Trigger barrier The signal , to source -> transformation -> sink , Will trigger , Set current operator operator state Save to HDFS Or on a local file , Every operator It's all backed up , The current one checkpoint It's done .
storage medium
- memoryStatebackend Production environment does not recommend
- FsStatebackend It's stored in HDFS Or on the local file system , Can be used in production environments .
- RocksdbStatebackend Store locally first , Asynchronous incremental storage to HDFS On the file system , Generally support large middle state scene
Checkpoint Configuration mode
Global profile flink-conf.yaml
state.backend: filesystem# Directory for checkpoints filesystem, when using any of the default bundled# state backends.#state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints# Default target directory for savepoints, optional.#state.savepoints.dir: hdfs://namenode-host:port/flink-savepointsSet through code
env.enableCheckpointing(1000);env.setStateBackend(new FsStateBackend("file:///D:/chk"));demand
package cn.itcast.flink.state;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.state.ListState;import org.apache.flink.api.common.state.ListStateDescriptor;import org.apache.flink.runtime.state.FunctionInitializationContext;import org.apache.flink.runtime.state.FunctionSnapshotContext;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Iterator;/** * Author itcast * Date 2021/6/21 9:18 * Desc TODO */public class OperatorStateDemo { public static void main(String[] args) throws Exception { //1. Create a streaming environment , For easy observation, set the parallelism to 1 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2. Turn on checkpoint , And save the status to file:///D:/chk , First open checkpoint ,state management env.enableCheckpointing(1000); // Set up state backend FsStateBackend : file system // RocksdbStateBackend : rocksdb plug-in unit Asynchronous incremental refresh to HDFS File system env.setStateBackend(new FsStateBackend("file:///D:/chk")); // Set up checkpoint, If the current task is cancelled , Identify the current checkpoint Whether or not to delete env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig .ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Set current checkpoint Timeout for env.getCheckpointConfig().setCheckpointTimeout(60000); // Two checkpoint The shortest interval between env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Sets the current parallel execution checkpoint The number of env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); //env.setStateBackend(new FsStateBackend("hdfs://node1:8020/checkpoints")); //env.setStateBackend(new RocksdbStateBackend()) //3. Set up checkpoint Configuration of external chk, Only once semantics, etc env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //4. Turn on the restart policy 3 Seconds to try to restart 3 Time env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000)); //5. Add a data source, such as MyMonitorKafkaSource , Instantiate create MyMonitorKafkaSource DataStreamSource<String> source = env.addSource(new MyMonitorKafkaSource()); //6. Printout source.print(); //7. perform env.execute(); } // establish MyMonitorKafkaSource Inherit RichParallelSourceFunction<String> And implement CheckpointedFunction public static class MyMonitorKafkaSource extends RichParallelSourceFunction<String> implements CheckpointedFunction{ // rewrite initializeState Method ListStateDescriptor State description and pass context obtain offsetState ListState<Long> offsetState = null; boolean flag = true; Long offset = 0L; @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Long> offsetStateDesc = new ListStateDescriptor<>("offsetState", Long.class); offsetState = context.getOperatorStateStore().getListState(offsetStateDesc); } // rewrite run Method Read out offset and Cyclic reading offset+=1, Get the core number of the implementation , Output ( Nuclear number and offset), One per second , Every time 5 Bar simulates an exception @Override public void run(SourceContext<String> ctx) throws Exception { Iterator<Long> iterator = offsetState.get().iterator(); if(iterator.hasNext()){ offset = iterator.next(); } while(flag){ offset = offset + 1; // Handle CPU The core Index int idx = getRuntimeContext().getIndexOfThisSubtask(); System.out.println("index:"+idx+" offset:"+offset); Thread.sleep(1000); if(offset % 5 ==0){ System.out.println(" There is an error in the current program ...."); throw new Exception(" Program out BUG..."); } } } // rewrite cancel Method @Override public void cancel() { flag = false; } // rewrite snapshotState Method , Empty offsetState , And will be the latest offset Add in @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { offsetState.clear(); offsetState.add(offset); } }}
Restart policy and recovery state
back : from checkpoint Backup HDFS Put the data state Restore to the current program
Restart strategy : An exception occurred in the current program , Different time 、 Restart the program by the number of restarts
Classification of restart policies :
- No restart noStrategy
- Always restart ( Default )
- Fixed delay restart strategy , restart 3 Time , Delay time between each time 5s
- Failure rate restart strategy , 5 Within minutes , restart 5 Time , Every time 1 minute
demand
from socket Reading data , Use checkpoint And restart mechanism , wordcount Statistics , simulation Input bug.
package cn.itcast.flink.state;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.runtime.state.filesystem.FsStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * Author itcast * Date 2021/6/21 11:18 * Desc TODO */public class CheckpointRestartDemo { public static void main(String[] args) throws Exception { //1.env execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //===========Checkpoint Parameter setting ==== // Set up Checkpoint The time interval of is 1000ms Do it once Checkpoint env.enableCheckpointing(1000); // Set up State Status storage media file:///d:/chk env.setStateBackend(new FsStateBackend("file:///d:/chk")); // Set twice checkpoint The shortest interval between env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // Set if you are doing Checkpoint There was a mistake in the process , Let the whole mission fail :true yes false No // When the job is cancelled , Retain checkpoint Don't delete env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig .ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Set up checkpointMode by Exactly_Once semantics env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // Set up checkpoint Timeout for , If Checkpoint stay 60s It has not been completed within this time Checkpoint Failure , Then discard . env.getCheckpointConfig().setCheckpointTimeout(60000); // Set the number of... At the same time checkpoint It can be executed at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //============= Restart strategy =========== //-1. The default policy : Configured with Checkpoint The restart policy is not configured, and unlimited restart is used by default //-2. Configure no restart policy // env.setRestartStrategy(RestartStrategies.noRestart()); //-3. Fixed delay restart strategy // restart 3 Time , Each interval 10s env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 3000)); //-4. Failure rate restart -- Use occasionally //5 Restart in minutes 3 Time ( The first 3 This time does not include , That is, restart at most 2 Time ), Each interval 10s env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10))); //2.Source establish socket data source DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999); //3.Transformation //3.1 Cut out each word from the space and flatMap convert to Tuple2 SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String word : words) { //3.2 Traverse the loop if you encounter bug The word throws an exception, and the simulator reports an error if (word.equals("bug")) { System.out.println(" The current program reports an error ..."); throw new Exception(" Program BUG ..."); } out.collect(Tuple2.of(word, 1)); } } }) //3.3 grouping // Be careful : The grouping of batch processing is groupBy, The grouping of stream processing is keyBy .keyBy(t -> t.f0) //3.4 polymerization .sum(1); //4.sink Printout result.print(); //5.execute env.execute(); }}
Savepoint Manually restart and restore
Manual backup and recovery
flink savepoint jobid Storage pathfrom savepoint Restore data
flink run -s Storage path --class Full path class name --jar jar package
Parallelism settings
- Four parallel settings
- Operator level
- Global parallelism
- client ( Black window ) Parallelism
- Profile Settings
- priority
- Operator level => 2. Global parallelism settings => 3. Set the parallelism of the client => 4. Profile parallelism
边栏推荐
- Network security -- Service Vulnerability scanning and utilization
- Network security - file upload competitive conditions bypass
- 网络安全——文件上传黑名单绕过
- 网络安全——文件上传白名单绕过
- 如何生成预期数据?埃默里大学等最新《深度学习可控数据生成》综述,52页pdf涵盖346篇文献全面阐述可控生成技术体系
- Network security - penetration using evil maid physical access security vulnerabilities
- Kunyu installation details
- 支持鹏程系列开源大模型应用生态演化的可持续学习能力探索
- SQL Server 启停作业脚本
- 在LNMP架构中搭建Zabbix监控服务
猜你喜欢
随机推荐
网络安全——文件上传渗透测试
Easycvr platform security scanning prompt go pprof debugging information leakage solution
Realize a JS lottery?
Simple use and difference of symmetric res, AES and asymmetric RSA (JWT)
Get (min / max value) of (object array / array)
WSDM 22 | 基于双曲几何的图推荐
Group knowledge map: distributed knowledge transfer and federated map reasoning
Icml2022 | branch reinforcement learning
基于典型相关分析的多视图学习方法综述
MPLS中的包交换和标签交换
Network security - function bypass injection
网络安全——Web信息收集
【无标题】
The gather function of tidyr package of R language converts a wide table into a long table (a wide table into a long table), the first parameter specifies the name of the new data column generated by
Network security - file upload blacklist bypass
网络安全——服务漏洞扫描与利用
Browser failed to get cookies, browser solution
Odoo+ test
Interview question 01.02. determine whether it is character rearrangement
Network security - filtering bypass injection









