当前位置:网站首页>Flink state
Flink state
2022-08-03 04:38:00 【College students' love programming】
The difference between state and normal variables:
Ordinary variable data is kept in memory,Task execution failure will be lost
flinkThe data in the state will be retrievedcheckpoint持久化到hdfs中,If the task fails, the previous calculation result can be restored
flink的checlpoint默认是关闭的,It doesn't work locally after opening it,Just submit to the server
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
//RETAIN_ON_CANCELLATION: Retained when the task is canceledcheckpoint
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 需要设置flink checkpoint保存状态的位置
env.setStateBackend(new HashMapStateBackend())
//将状态保存到hdfs中 env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")
sum()算子,底层使用了flinkThe state saves the previous calculation results,flink的状态会被checkpoint持久化到hdfs中,If the task is canceled or fails to execute, the previous calculation result can be restored
1. state
1. flinkMechanism for saving the results of previous calculations
2. flink会为每一个key保存一个状态
3. 常用的sum(The previous calculation results need to be saved) window(Data needs to be saved for a period of time)Internally it is stateful
4. flinkSeveral commonly used state classes are also provided
1. valueState: 单值状态,为每一个key保存一个值,可以是任何类型,Must be serializable
2. mapState: kv格式的状态,为每一个key保存一个kv格式的状态
3. listState: 集合状态,为每一个keySave a collection state,A collection can hold multiple elements
4. reducingState/AggregatingState:聚合状态,为每一个key保存一个值,An aggregate function is required to redefine the state
5.flinkThe difference between the state and ordinary variables:
Normal variables are saved againflink的内存中的,如果flink任务执行失败,The variable's data will be lost
flinkThe state is a special variable,The data in the state will be retrievedcheckpoint持久化到hdfs中, 如果任务执行失败,重启任务,Status can be restored
6.状态后端,The location for saving state
1. HashMapStateBackend:
-1. 将flinkstate is saved firstTaskManager的内存中,在触发checkpoint的时候将taskmanagerThe state in is persisted to hdfs中
-2. 可以直接使用
```java
env.setStateBackend(new HashMapStateBackend())
```
2. EmbeddedRocksDBStateBackend:
- 1. RocksDSIs a local lightweight database,数据在磁盘上
- 2. 再启动lfinkTask time will be in eachtaskManagerThe node where it is located starts onerocksDB进程
- 3. flinkThe status will be saved firstrocksDb数据库中,当触发checkpointWhen persisting the state in the database tohdfs中
-4. Incremental snapshots can be supported
-5. 使用rocksDbThe state backend needs to bring in dependencies
```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.15.0</version>
</dependency>
```
-6. 使用方式
```java
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
```
valueState状态:Store the previous calculation result
多种状态
open():在map之前执行,每一个task中只执行一次
flinkThe state needs to be inopen中定义
Average age is calculated in real time:
object Demo14AvgAge {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//Calculate the average age of each class in real time
val studentDS: DataStream[String] = env.socketTextStream("master", 8888)
//Take out the age of the class and students
val clazzAndAge: DataStream[(String, Int)] = studentDS.map(line => {
val split: Array[String] = line.split(",")
val clazz: String = split(4)
val age: Int = split(2).toInt
(clazz, age)
})
//按照班级分组
val keyByDS: KeyedStream[(String, Int), String] = clazzAndAge.keyBy(_._1)
//计算平均年龄
//flinkThe state can be used in the task operator,map,faltmap,filter process都可以
val avgAgeDS: DataStream[(String, Double)] = keyByDS
.process(new KeyedProcessFunction[String, (String, Int), (String, Double)] {
override def open(parameters: Configuration): Unit = {
/Define two states to hold the total number of people and the total age
val context: RuntimeContext = getRuntimeContext
//A description object for the state of the total headcount
val sumNumDesc = new ValueStateDescriptor[Int]("sumNum", classOf[Int])
//A description of the total age of the object
val sumAgeDesc = new ValueStateDescriptor[Int]("sumAge", classOf[Int])
sumNumState = context.getState(sumNumDesc)
sumAgeState = context.getState(sumAgeDesc)
}
//Save the state of the total number of people
var sumNumState: ValueState[Int] = _
//Save the state of the total age
var sumAgeState: ValueState[Int] = _
override def processElement(kv: (String, Int),
ctx: KeyedProcessFunction[String, (String, Int), (String, Double)]#Context,
out: Collector[(String, Double)]): Unit = {
val clazz: String = kv._1
val age: Int = kv._2
//Get the previous total number of people and total age
var sumNum: Int = sumNumState.value()
//cumulative number
sumNum += 1
//更新状态
sumNumState.update(sumNum)
var sumAge: Int = sumAgeState.value()
//累加
sumAge += age
//更新状态
sumAgeState.update(sumAge)
//计算平均年龄
val avgAge: Double = sumAge.toDouble / sumNum
//将数据发送到下游
out.collect((clazz, avgAge))
}
})
avgAgeDS.print()
env.execute()
}
}
2. checkpoint
checkpoint是flink用于持久化flink状态的机制
flink会定时将flinkThe computed state is persisted tohdfs中
开启checkpint的方法: in the code or in the source code
2.1 The code has the highest priority when it is turned on alone
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
//RETAIN_ON_CANCELLATION: Retained when the task is canceledcheckpoint
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//Specifies the state backend
//EmbeddedRocksDBStateBackend eocksDb状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
//将状态保存到hdfs中,在触发checkpointWhen persisting the state to hdfs中
env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")
2.2 Modify the configuration in the source code to open(flinkThe new version works)
vim flink-conf.yaml
execution.checkpointing.interval: 3min
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 0
state.backend: rocksdb
state.checkpoints.dir: hdfs://master:9000/flink/checkpoint
2.3 从checkpoint中恢复任务
1.can be specified on the web pagecheckpintpath recovery,The initial submission of the task does not need to specify a path,It needs to be specified when restoring a taskhdfsThe path to save the state in,The path needs to be prefixedhdfs://master:9000
样式:hdfs://master:9000/flink/checkpoint/11edbec21742ceddebbb90f3e49f24b4/chk-35
2.Resubmit the task at the command line,Specifies the location of the restore task: -s Specifies the path to restore tasks
flink run -t yarn-session -Dyarn.application.id=application_1658546198162_0005 -c com.shujia.flink.core.Demo15RocksDB -s hdfs://master:9000/flink/checkpoint/11edbec21742ceddebbb90f3e49f24b4/chk-35 flink-1.0.jar
边栏推荐
猜你喜欢
随机推荐
Flink状态
Where is the value of testers
OpenFOAM extracts equivalency and calculates area
Windows 安装PostgreSQL
【Harmony OS】【ARK UI】轻量级数据存储
【uni-APP搭建项目】
js中的闭包
EssilorLuxottica借助Boomi的智能集成平台实现订单处理的现代化
9.新闻分类:多分类问题
刚上线就狂吸70W粉,新型商业模式“分享购”来了,你知道吗?
社交电商:流量红利已尽,裂变营销是最低成本的获客之道
接口测试框架实战 | 流程封装与基于加密接口的测试用例设计
汇编书摘抄
1.一个神经网络示例
工程制图-齿轮
online test paper concept
数字化时代,企业如何建立自身的云平台与商业模式的选择?
工程水文学知识点
三丁基-巯基膦烷「tBuBrettPhos Pd(allyl)」OTf),1798782-17-8
刚上线就狂吸70W粉,新型商业模式“分享购”来了,你知道吗?