当前位置:网站首页>Talk about the understanding of fault tolerance mechanism and state consistency in Flink framework
Talk about the understanding of fault tolerance mechanism and state consistency in Flink framework
2022-07-05 10:41:00 【InfoQ】
Fault tolerance mechanism
- restart app
- from checkpoint Read status in , Reset the status
- Start consuming and processing all data from checkpoint to failure
public class Test{
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration of checkpoints
// The checkpoint start cycle is milliseconds
env.enableCheckpointing(300);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Set timeout
env.getCheckpointConfig().setCheckpointTimeout(60000);
// Set the maximum execution checkp Number
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
// Set the minimum interval , Two checkpoint Between , The interval between the previous completion and the next start cannot be less than XXX
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100);
//env.getCheckpointConfig().setPreferCheckpointForRecovery();
// tolerate checkpoint How many times have you failed
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
// Configuration of restart policy
// Fixed delay restart every other 10 Seconds to restart three times
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,10000L));
// Failure rate restart stay 10 Count restarts within minutes , stay 10 Try to restart three times in minutes , Every septum 1 Minutes to restart
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10),Time.minutes(1)));
DataStreamSource<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
env.execute();
}
}
State consistency
- AT-MOST-ONCE( At most once ) When the mission fails , The easiest way is to do nothing , Neither restore the lost state , No replay of lost data .At-most-once The meaning of semantics is to handle an event at most once .
- AT-LEAST-ONCE( At least once ) In most real application scenarios , We hope not to lose Events . This type of protection is called atleast-once, It means that all the events have been dealt with , And some events can be handled multiple times .
- EXACTLY-ONCE( Exactly once ) Just once is the strictest guarantee , It's also the most difficult . Just dealing with semantics once doesn't just mean that no events are lost , It also means for every data , The internal state is only updated once .
@Public
public enum CheckpointingMode {
/**
* Sets the checkpointing mode to "exactly once". This mode means that the system will
* checkpoint the operator and user function state in such a way that, upon recovery,
* every record will be reflected exactly once in the operator state.
*
* <p>For example, if a user function counts the number of elements in a stream,
* this number will consistently be equal to the number of actual elements in the stream,
* regardless of failures and recovery.</p>
*
* <p>Note that this does not mean that each record flows through the streaming data flow
* only once. It means that upon recovery, the state of operators/functions is restored such
* that the resumed data streams pick up exactly at after the last modification to the state.</p>
*
* <p>Note that this mode does not guarantee exactly-once behavior in the interaction with
* external systems (only state in Flink's operators and user functions). The reason for that
* is that a certain level of "collaboration" is required between two systems to achieve
* exactly-once guarantees. However, for certain systems, connectors can be written that facilitate
* this collaboration.</p>
*
* <p>This mode sustains high throughput. Depending on the data flow graph and operations,
* this mode may increase the record latency, because operators need to align their input
* streams, in order to create a consistent snapshot point. The latency increase for simple
* dataflows (no repartitioning) is negligible. For simple dataflows with repartitioning, the average
* latency remains small, but the slowest records typically have an increased latency.</p>
*/
EXACTLY_ONCE,
/**
* Sets the checkpointing mode to "at least once". This mode means that the system will
* checkpoint the operator and user function state in a simpler way. Upon failure and recovery,
* some records may be reflected multiple times in the operator state.
*
* <p>For example, if a user function counts the number of elements in a stream,
* this number will equal to, or larger, than the actual number of elements in the stream,
* in the presence of failure and recovery.</p>
*
* <p>This mode has minimal impact on latency and may be preferable in very-low latency
* scenarios, where a sustained very-low latency (such as few milliseconds) is needed,
* and where occasional duplicate messages (on recovery) do not matter.</p>
*/
AT_LEAST_ONCE
}
- flink Internal assurance : rely on checkpoint
- source End : An external source is required to reset the read location of the data
- sink End : Need to ensure recovery from failure , Data is not repeatedly written to external systems . Two ways : Idempotent writing 、 Transaction write .

边栏推荐
- 流程控制、
- Workmanager Learning one
- C function returns multiple value methods
- How did automated specification inspection software develop?
- TypeError: Cannot read properties of undefined (reading ‘cancelToken‘)
- Flink CDC cannot monitor MySQL logs. Have you ever encountered this problem?
- flink cdc不能监听mysql日志,大家遇到过这个问题吧?
- AtCoder Beginner Contest 254「E bfs」「F st表维护差分数组gcd」
- 埋点111
- iframe
猜你喜欢
2022年流动式起重机司机考试题库及模拟考试
[dark horse morning post] Luo Yonghao responded to ridicule Oriental selection; Dong Qing's husband Mi Chunlei was executed for more than 700million; Geely officially acquired Meizu; Huawei releases M
【JS】提取字符串中的分数,汇总后算出平均分,并与每个分数比较,输出
微信核酸检测预约小程序系统毕业设计毕设(7)中期检查报告
Redis如何实现多可用区?
5G NR系统架构
DGL中异构图的一些理解以及异构图卷积HeteroGraphConv的用法
2022鹏城杯web
Who is the "conscience" domestic brand?
非技術部門,如何參與 DevOps?
随机推荐
vite//
Idea create a new sprintboot project
函数///
A large number of virtual anchors in station B were collectively forced to refund: revenue evaporated, but they still owe station B; Jobs was posthumously awarded the U.S. presidential medal of freedo
Go项目实战—参数绑定,类型转换
websocket
iframe
Activity jump encapsulation
Apple 5g chip research and development failure? It's too early to get rid of Qualcomm
How can PostgreSQL CDC set a separate incremental mode, debezium snapshot. mo
Secteur non technique, comment participer à devops?
SLAM 01.人类识别环境&路径的模型建立
2022年T电梯修理操作证考试题及答案
Node の MongoDB Driver
How can non-technical departments participate in Devops?
Have the bosses ever encountered such problems in the implementation of flinksql by Flink CDC mongdb?
请问postgresql cdc 怎么设置单独的增量模式呀,debezium.snapshot.mo
爬虫(9) - Scrapy框架(1) | Scrapy 异步网络爬虫框架
各位大佬,我测试起了3条线程同时往3个mysql表中写入,每条线程分别写入100000条数据,用了f
LDAP概述