当前位置:网站首页>Flink状态
Flink状态
2022-08-03 04:28:00 【大学生爱编程】
状态和普通变量的区别:
普通变量数据保存在内存中,任务执行失败会丢失
flink的状态中的数据会被checkpoint持久化到hdfs中,如果任务失败还能恢复到之前的计算结果
flink的checlpoint默认是关闭的,开启后在本地无法运行了,只能提交服务器了
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
//RETAIN_ON_CANCELLATION: 当任务取消时保留checkpoint
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 需要设置flink checkpoint保存状态的位置
env.setStateBackend(new HashMapStateBackend())
//将状态保存到hdfs中 env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")
sum()算子,底层使用了flink的状态保存之前的计算结果,flink的状态会被checkpoint持久化到hdfs中,任务被取消或者执行失败可以恢复之前的计算结果
1. state
1. flink用于保存之前计算结果的机制
2. flink会为每一个key保存一个状态
3. 常用的sum(需要保存之前的计算结果) window(需要保存一段时间内的数据)内部都是有状态的
4. flink也提供了几种常用的状态类
1. valueState: 单值状态,为每一个key保存一个值,可以是任何类型,必须可以序列化
2. mapState: kv格式的状态,为每一个key保存一个kv格式的状态
3. listState: 集合状态,为每一个key保存一个集合状态,集合中可以保存多个元素
4. reducingState/AggregatingState:聚合状态,为每一个key保存一个值,再定义状态时需要一个聚合函数
5.flink的状态和普通变量的区别:
普通变量是保存再flink的内存中的,如果flink任务执行失败,变量的数据会丢失
flink的状态是一个特殊的变量,状态中的数据会被checkpoint持久化到hdfs中, 如果任务执行失败,重启任务,可以恢复状态
6.状态后端,用于保存状态的位置
1. HashMapStateBackend:
-1. 将flink的状态先保存TaskManager的内存中,在触发checkpoint的时候将taskmanager中的状态再持久化到hdfs中
-2. 可以直接使用
```java
env.setStateBackend(new HashMapStateBackend())
```
2. EmbeddedRocksDBStateBackend:
- 1. RocksDS是一个本地的轻量级的数据库,数据在磁盘上
- 2. 再启动lfink任务的时候会在每一个taskManager所在的节点启动一个rocksDB进程
- 3. flink的状态会先保存在rocksDb数据库中,当触发checkpoint的时候将数据库中的状态持久化到hdfs中
-4. 可以支持增量快照
-5. 使用rocksDb状态后端需要带入依赖
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.15.0</version>
</dependency>
```
-6. 使用方式
```java
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
```
valueState状态:存储之前的计算结果
多种状态
open():在map之前执行,每一个task中只执行一次
flink的状态需要在open中定义
实时计算平均年龄:
object Demo14AvgAge {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//实时计算每一个班级的平均年龄
val studentDS: DataStream[String] = env.socketTextStream("master", 8888)
//取出班级和学生的年龄
val clazzAndAge: DataStream[(String, Int)] = studentDS.map(line => {
val split: Array[String] = line.split(",")
val clazz: String = split(4)
val age: Int = split(2).toInt
(clazz, age)
})
//按照班级分组
val keyByDS: KeyedStream[(String, Int), String] = clazzAndAge.keyBy(_._1)
//计算平均年龄
//flink的状态可以再任务算子中使用,map,faltmap,filter process都可以
val avgAgeDS: DataStream[(String, Double)] = keyByDS
.process(new KeyedProcessFunction[String, (String, Int), (String, Double)] {
override def open(parameters: Configuration): Unit = {
/定义两个状态来保存总人数和总的年龄
val context: RuntimeContext = getRuntimeContext
//总人数状态的描述对象
val sumNumDesc = new ValueStateDescriptor[Int]("sumNum", classOf[Int])
//总的年龄的描述对象
val sumAgeDesc = new ValueStateDescriptor[Int]("sumAge", classOf[Int])
sumNumState = context.getState(sumNumDesc)
sumAgeState = context.getState(sumAgeDesc)
}
//保存总的人数的状态
var sumNumState: ValueState[Int] = _
//保存总的年龄的状态
var sumAgeState: ValueState[Int] = _
override def processElement(kv: (String, Int),
ctx: KeyedProcessFunction[String, (String, Int), (String, Double)]#Context,
out: Collector[(String, Double)]): Unit = {
val clazz: String = kv._1
val age: Int = kv._2
//获取之前的总的人数和总的年龄
var sumNum: Int = sumNumState.value()
//人数累加
sumNum += 1
//更新状态
sumNumState.update(sumNum)
var sumAge: Int = sumAgeState.value()
//累加
sumAge += age
//更新状态
sumAgeState.update(sumAge)
//计算平均年龄
val avgAge: Double = sumAge.toDouble / sumNum
//将数据发送到下游
out.collect((clazz, avgAge))
}
})
avgAgeDS.print()
env.execute()
}
}
2. checkpoint
checkpoint是flink用于持久化flink状态的机制
flink会定时将flink计算的状态持久化到hdfs中
开启checkpint的方法: 代码中或者源码中
2.1 代码单独开启优先级最高
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
//RETAIN_ON_CANCELLATION: 当任务取消时保留checkpoint
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//指定状态后端
//EmbeddedRocksDBStateBackend eocksDb状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
//将状态保存到hdfs中,在触发checkpoint的时候将状态持久化到hdfs中
env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")
2.2 在源码中修改配置开启(flink新版可以操作)
vim flink-conf.yaml
execution.checkpointing.interval: 3min
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 0
state.backend: rocksdb
state.checkpoints.dir: hdfs://master:9000/flink/checkpoint
2.3 从checkpoint中恢复任务
1.可以在网页中指定checkpint的路径恢复,初次提交任务不需要指定路径,恢复任务时需要指定hdfs中保存状态的路径,路径需要带上前缀hdfs://master:9000
样式:hdfs://master:9000/flink/checkpoint/11edbec21742ceddebbb90f3e49f24b4/chk-35
2.在命令行中重新提交任务,指定恢复任务的位置: -s 指定恢复任务的路径
flink run -t yarn-session -Dyarn.application.id=application_1658546198162_0005 -c com.shujia.flink.core.Demo15RocksDB -s hdfs://master:9000/flink/checkpoint/11edbec21742ceddebbb90f3e49f24b4/chk-35 flink-1.0.jar
边栏推荐
猜你喜欢
随机推荐
修饰生物素DIAZO-生物素-PEG3-DBCO|重氮-生物素-三聚乙二醇-二苯基环辛炔
6.神经网络剖析
2022河南萌新联赛第(四)场:郑州轻工业大学 G - 迷宫
「短视频+社交电商」营销模式爆发式发展,带来的好处有什么?
The flink sql task is changed, and after adding several fields to the sql, an error occurs when restoring from the previously saved savepoint.
正则表达式与绕过案例
关于#sql#的问题,如何解决?
3.张量运算
工程制图-齿轮
5.回顾简单的神经网络
肖sir__面试接口测试
工程水文学知识点
redis键值出现 xacxedx00x05tx00&的解决方法
IDEC和泉触摸屏维修HG2F-SS22V HG4F软件通信分析
常见亲脂性细胞膜染料DiO, Dil, DiR, Did光谱图和实验操作流程
2022 Henan Mengxin League Game (4): Zhengzhou University of Light Industry E - Sleep Well
mysql 创建索引的三种方式
移动流量的爆发式增长,社交电商如何选择商业模式
那些让电子工程师崩溃瞬间,你经历了几个呢?
肖sir___面试就业课程____app