当前位置:网站首页>Flink实现Exactly Once
Flink实现Exactly Once
2022-08-02 14:05:00 【boyzwz】
前言
Flink通过状态快照实现容错处理:
Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。
如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。
Flink 管理的状态存储在 state backend 中。
checkpoint 代码
/**
* 创建flink环境
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
/**
* 使用flink 的 checkpoint将结果保存到hdfs上去
*
* 若任务中途失败或者重新运行,只需指定保存的hdfs路径,就可在上次执行的结果上继续执行
* 不用让数据重新开始
*
* flink中的有状态计算才可以checkpoint,若自己创建的hashmap则无法保存
*/
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:当作业取消时,保留作业的 checkpoint。注意,这种情况下,需要手动清除该作业保留的 checkpoint。
// ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:当作业取消时,删除作业的 checkpoint。仅当作业失败时,作业的 checkpoint 才会被保留。
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
/**
* 设置flink checkpoint保存状态的位置
*
* 创建一个临时数据库保存
* env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
*/
env.setStateBackend(new HashMapStateBackend())
//将状态保存到hdfs中
env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")
一、状态快照 checkpoint
Checkpoint – 一种由 Flink 自动执行的快照,其目的是能够从故障中恢复。Flink开启checkpoint可在任务失败或者重启时,重新提交任务指定checkpoint保存外部系统的路径,即可在上次执行的结果上继续执行,数据不需重新开始。
checkpoint的执行:
1、checkpoint是由JobManager定时去执行,flink快照机制并不是在同一时刻所有任务一同执行,而是所有任务在处理完同一数据后,保存自身状态
2、JM向Source Task发送Trigger,Source Task会保存自身状态(记录当前读取数据的偏移量),并在数据流中,插入带有编号的 checkpoint barriers,向下游传递barrier
3、当下游的task接收到barrier时,会保存自身状态(如有上游有多个分区任务,下游task收到上游所有实例的barrier才会做快照),继续向下游传递barrier
4、当所有task完成同一次checkpoint的barrier之后,一次checkpoint完成
5、当快照被持久保存后,JM会删除旧的checkpoint文件
6、当任务状态信息备份完成后,会上报JM,当所有的任务都上报后,完成一次checkpoint
二、端对端确保精确一次 Exactly Once
状态一致性:
计算结果要保证准确
每一条数据都不应该丢失,也不应该重复计算
在遇到故障时可以恢复状态,恢复以后的重新计算,结果也应该是正确的
(每一条数据的处理只影响一次结果)
状态一致性分类:
AT_MOST_ONCE(最多一次)可能会导致数据丢失
AT_LEAST_ONCE(最少一次)可能会导致数据重复,多次处理
EXACTLY_ONCE (精确一次)
Exactly Once:
Flink 使用了轻量级快照机制--检查点(checkpoint)来保证exactly once语义
1、source端
必须是可重放的
Flink 分布式快照保存数据计算的状态和消费的偏移量,保证程序重启之后不丢失状态和消费偏移量
2、端对端
内部保证--checkpoint
3、sink端
sinks 必须是事务性的(或幂等的)
幂等:对一个数据进行多次操作,对结果只会更改一次。即第一次操作以后,后面重复执行就不起作用了(hashmap集合,对于一个kv数据存入多次,结果不会改变)
sink端必须支持事务写入(要么全部成功,要么全部不写入)
若sink端数据不是事务写入,一条一条写入的,发生故障的话,会回滚到上一次checkpoint,有些数据就会再次写入,即产生重复数据。
两次写入(Two-Phase-Commit,2PC)
1、sink端会先将数据写入事务,预提交至外部系统中
2、当sink端读取到上游传递来的barrier,保存自身状态到状态后端后,上报JM(同时会开启一个新的事务,在barrier后的到达的数据,将由新的事务预提交);当所有任务上报后,checkpoint完成,sink端正式提交
3、若在预提交阶段发生故障,预提交的数据会全部撤销,此时回滚到上次checkpoint,再次处理数据,写入的数据也不会重复
实现exactly once需要配置:
1、必须启用checkpoint
2、选择参数EXACTLY_ONCE
3、Kafka中默认为read_uncommitted,即会读取到未提交的数据,所以应将其修改为read_committed
4、Flink的Kafka连接器配置超时时间默认为1小时,Flink集群配置事务超时时间为15分钟,应设置前者小于后者
三、Kafka中数据不丢失
边栏推荐
- A little thought about password encryption
- C语言初级—常见问题(100~200素数,计算1+11+111+...,从键盘获取一个数并输出有几个位)
- C语言初级—用一角,两角,五角和一元组成3.5元有多少种组合方法
- MySQL知识总结 (六) MySQL调优
- 【Camera2】由Camera2 特性想到的有关MED(多场景设备互动)的场景Idea
- Flask contexts, blueprints and Flask-RESTful
- 利用c语言实现对键盘输入的一串字符的各类字符的计数
- verilog学习|《Verilog数字系统设计教程》夏宇闻 第三版思考题答案(第十四章)
- Flask request application context source code analysis
- Redis-01-Nosql概述
猜你喜欢
随机推荐
Flink-独立集群/Yarn
数据的表示方法和转换(二进制、八进制、十进制、十六进制)
Linux: CentOS 7 install MySQL5.7
主存储器(一)
VS Code远程开发及免密配置
华为防火墙
Kubernetes介绍
Camera Hal(Hal3)层修改Preview流
MySQL知识总结 (一) 数据类型
Unit 14 Viewsets and Routing
函数递归和动态内存初识
加减法运算及其溢出处理
初识c语言指针
uniapp小程序禁止遮罩弹窗下的页面滚动的完美解决办法
Error Correction Design Principle of Hamming Check Code
每周招聘|PostgreSQL专家,年薪60+,高能力高薪资
MySQL知识总结 (十一) MySql 日志,数据备份,数据恢复
mysql
C语言一维数组练习——将m个元素移动到数组尾部
宝塔搭建PHP自适应懒人网址导航源码实测