当前位置:网站首页>Flink实现Exactly Once
Flink实现Exactly Once
2022-08-02 14:05:00 【boyzwz】
前言
Flink通过状态快照实现容错处理:
Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。
如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。
Flink 管理的状态存储在 state backend 中。
checkpoint 代码
/**
* 创建flink环境
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
/**
* 使用flink 的 checkpoint将结果保存到hdfs上去
*
* 若任务中途失败或者重新运行,只需指定保存的hdfs路径,就可在上次执行的结果上继续执行
* 不用让数据重新开始
*
* flink中的有状态计算才可以checkpoint,若自己创建的hashmap则无法保存
*/
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
// ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
/**
* 设置flink checkpoint保存状态的位置
*
* 创建一个临时数据库保存
* env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
*/
env.setStateBackend(new HashMapStateBackend())
//将状态保存到hdfs中
env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")
一、状态快照 checkpoint
Checkpoint – 一种由 Flink 自动执行的快照,其目的是能够从故障中恢复。Flink开启checkpoint可在任务失败或者重启时,重新提交任务指定checkpoint保存外部系统的路径,即可在上次执行的结果上继续执行,数据不需重新开始。
checkpoint的执行:
1、checkpoint是由JobManager定时去执行,flink快照机制并不是在同一时刻所有任务一同执行,而是所有任务在处理完同一数据后,保存自身状态
2、JM向Source Task发送Trigger,Source Task会保存自身状态(记录当前读取数据的偏移量),并在数据流中,插入带有编号的 checkpoint barriers,向下游传递barrier
3、当下游的task接收到barrier时,会保存自身状态(如有上游有多个分区任务,下游task收到上游所有实例的barrier才会做快照),继续向下游传递barrier
4、当所有task完成同一次checkpoint的barrier之后,一次checkpoint完成
5、当快照被持久保存后,JM会删除旧的checkpoint文件
6、当任务状态信息备份完成后,会上报JM,当所有的任务都上报后,完成一次checkpoint
二、端对端确保精确一次 Exactly Once
状态一致性:
计算结果要保证准确
每一条数据都不应该丢失,也不应该重复计算
在遇到故障时可以恢复状态,恢复以后的重新计算,结果也应该是正确的
(每一条数据的处理只影响一次结果)
状态一致性分类:
AT_MOST_ONCE(最多一次)可能会导致数据丢失
AT_LEAST_ONCE(最少一次)可能会导致数据重复,多次处理
EXACTLY_ONCE (精确一次)
Exactly Once:
Flink 使用了轻量级快照机制--检查点(checkpoint)来保证exactly once语义
1、source端
必须是可重放的
Flink 分布式快照保存数据计算的状态和消费的偏移量,保证程序重启之后不丢失状态和消费偏移量
2、端对端
内部保证--checkpoint
3、sink端
sinks 必须是事务性的(或幂等的)
幂等:对一个数据进行多次操作,对结果只会更改一次。即第一次操作以后,后面重复执行就不起作用了(hashmap集合,对于一个kv数据存入多次,结果不会改变)
sink端必须支持事务写入(要么全部成功,要么全部不写入)
若sink端数据不是事务写入,一条一条写入的,发生故障的话,会回滚到上一次checkpoint,有些数据就会再次写入,即产生重复数据。
两次写入(Two-Phase-Commit,2PC)
1、sink端会先将数据写入事务,预提交至外部系统中
2、当sink端读取到上游传递来的barrier,保存自身状态到状态后端后,上报JM(同时会开启一个新的事务,在barrier后的到达的数据,将由新的事务预提交);当所有任务上报后,checkpoint完成,sink端正式提交
3、若在预提交阶段发生故障,预提交的数据会全部撤销,此时回滚到上次checkpoint,再次处理数据,写入的数据也不会重复
实现exactly once需要配置:
1、必须启用checkpoint
2、选择参数EXACTLY_ONCE
3、Kafka中默认为read_uncommitted,即会读取到未提交的数据,所以应将其修改为read_committed
4、Flink的Kafka连接器配置超时时间默认为1小时,Flink集群配置事务超时时间为15分钟,应设置前者小于后者
三、Kafka中数据不丢失
边栏推荐
猜你喜欢
The IDEA of packaged jar package
Using the cloud GPU + pycharm training model to realize automatic background run programs, save training results, the server automatically power off
二级指针,数组指针,指针数组和函数指针
Implementation of redis distributed lock and watchdog
MySQL知识总结 (六) MySQL调优
利用c语言实现对键盘输入的一串字符的各类字符的计数
Unit 14 Viewsets and Routing
STM32 (F407) - stack
Visual Studio配置OpenCV之后,提示:#include<opencv2/opencv.hpp>无法打开源文件
宏定义问题记录day2
随机推荐
C语言日记 2 标识符
C语言日记 5、7setprecision()问题
C语言日记 1“Hello world“
Unit 10 Continuous Tuning
二级指针,数组指针,指针数组和函数指针
初识c语言指针
Eslint规则大全
线性代数期末复习存档
C语言初级—判断一个数是不是素数(函数封装)
C语言——断言assert的使用
重新学习编程day1 【初始c语言】【c语言编写出计算两个数之和的代码】
YOLOv7使用云GPU训练自己的数据集
mysql
Using the cloud GPU + pycharm training model to realize automatic background run programs, save training results, the server automatically power off
MySQL知识总结 (五) 锁
Verilog Learning Series
uniapp小程序禁止遮罩弹窗下的页面滚动的完美解决办法
Web Design (Beginners) [easy to understand]
St. Regis Takeaway Notes - Lecture 10 Swagger
verilog学习|《Verilog数字系统设计教程》夏宇闻 第三版思考题答案(第十三章)