当前位置:网站首页>Flink状态

Flink状态

2022-08-03 04:28:00 大学生爱编程

状态和普通变量的区别:
普通变量数据保存在内存中,任务执行失败会丢失
flink的状态中的数据会被checkpoint持久化到hdfs中,如果任务失败还能恢复到之前的计算结果
flink的checlpoint默认是关闭的,开启后在本地无法运行了,只能提交服务器了
    // 每 1000ms 开始一次 checkpoint
    env.enableCheckpointing(1000)
    // 高级选项:
    // 设置模式为精确一次 (这是默认值)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 确认 checkpoints 之间的时间会进行 500 ms
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    // Checkpoint 必须在一分钟内完成,否则就会被抛弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    // 允许两个连续的 checkpoint 错误
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
    // 同一时间只允许一个 checkpoint 进行
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
    //RETAIN_ON_CANCELLATION: 当任务取消时保留checkpoint
    env.getCheckpointConfig.setExternalizedCheckpointCleanup(
      ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    // 需要设置flink checkpoint保存状态的位置
    env.setStateBackend(new HashMapStateBackend())
    //将状态保存到hdfs中  env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")

sum()算子,底层使用了flink的状态保存之前的计算结果,flink的状态会被checkpoint持久化到hdfs中,任务被取消或者执行失败可以恢复之前的计算结果

1. state

1. flink用于保存之前计算结果的机制
2. flink会为每一个key保存一个状态
3. 常用的sum(需要保存之前的计算结果)  window(需要保存一段时间内的数据)内部都是有状态的
4. flink也提供了几种常用的状态类
   1. valueState: 单值状态,为每一个key保存一个值,可以是任何类型,必须可以序列化
   2. mapState: kv格式的状态,为每一个key保存一个kv格式的状态
   3. listState: 集合状态,为每一个key保存一个集合状态,集合中可以保存多个元素
   4. reducingState/AggregatingState:聚合状态,为每一个key保存一个值,再定义状态时需要一个聚合函数
5.flink的状态和普通变量的区别:
 普通变量是保存再flink的内存中的,如果flink任务执行失败,变量的数据会丢失
 flink的状态是一个特殊的变量,状态中的数据会被checkpoint持久化到hdfs中, 如果任务执行失败,重启任务,可以恢复状态
6.状态后端,用于保存状态的位置
1. HashMapStateBackend: 
   -1. 将flink的状态先保存TaskManager的内存中,在触发checkpoint的时候将taskmanager中的状态再持久化到hdfs中
   -2. 可以直接使用
      ```java
      env.setStateBackend(new HashMapStateBackend())
      ```
2. EmbeddedRocksDBStateBackend:
  - 1. RocksDS是一个本地的轻量级的数据库,数据在磁盘上
  - 2. 再启动lfink任务的时候会在每一个taskManager所在的节点启动一个rocksDB进程
  - 3. flink的状态会先保存在rocksDb数据库中,当触发checkpoint的时候将数据库中的状态持久化到hdfs中
   -4. 可以支持增量快照
   -5. 使用rocksDb状态后端需要带入依赖
      ```xml
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-statebackend-rocksdb</artifactId>
          <version>1.15.0</version>
      </dependency>
      ```
   -6. 使用方式
      ```java
      env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
      ```

      

valueState状态:存储之前的计算结果
多种状态

open():在map之前执行,每一个task中只执行一次
flink的状态需要在open中定义

实时计算平均年龄:
object Demo14AvgAge {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

   //实时计算每一个班级的平均年龄
    val studentDS: DataStream[String] = env.socketTextStream("master", 8888)
    //取出班级和学生的年龄
    val clazzAndAge: DataStream[(String, Int)] = studentDS.map(line => {
      val split: Array[String] = line.split(",")
      val clazz: String = split(4)
      val age: Int = split(2).toInt
      (clazz, age)
    })
    //按照班级分组
    val keyByDS: KeyedStream[(String, Int), String] = clazzAndAge.keyBy(_._1)
    //计算平均年龄
    //flink的状态可以再任务算子中使用,map,faltmap,filter process都可以
    val avgAgeDS: DataStream[(String, Double)] = keyByDS
      .process(new KeyedProcessFunction[String, (String, Int), (String, Double)] {
        override def open(parameters: Configuration): Unit = {        
           /定义两个状态来保存总人数和总的年龄         
          val context: RuntimeContext = getRuntimeContext
          //总人数状态的描述对象
          val sumNumDesc = new ValueStateDescriptor[Int]("sumNum", classOf[Int])
          //总的年龄的描述对象
          val sumAgeDesc = new ValueStateDescriptor[Int]("sumAge", classOf[Int])
          sumNumState = context.getState(sumNumDesc)
          sumAgeState = context.getState(sumAgeDesc)
        }
        //保存总的人数的状态
        var sumNumState: ValueState[Int] = _
        //保存总的年龄的状态
        var sumAgeState: ValueState[Int] = _

        override def processElement(kv: (String, Int),
                                    ctx: KeyedProcessFunction[String, (String, Int), (String, Double)]#Context,
                                    out: Collector[(String, Double)]): Unit = {

          val clazz: String = kv._1
          val age: Int = kv._2

          //获取之前的总的人数和总的年龄
          var sumNum: Int = sumNumState.value()
          //人数累加
          sumNum += 1
          //更新状态
          sumNumState.update(sumNum) 
          var sumAge: Int = sumAgeState.value()
          //累加
          sumAge += age
          //更新状态
          sumAgeState.update(sumAge)
          //计算平均年龄
          val avgAge: Double = sumAge.toDouble / sumNum
          //将数据发送到下游
          out.collect((clazz, avgAge))
        }
      }) 
    avgAgeDS.print()
    env.execute()
  }
}

2. checkpoint

checkpoint是flink用于持久化flink状态的机制
flink会定时将flink计算的状态持久化到hdfs中
开启checkpint的方法: 代码中或者源码中

2.1 代码单独开启优先级最高

  // 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000)
// 高级选项:
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
//RETAIN_ON_CANCELLATION: 当任务取消时保留checkpoint
env.getCheckpointConfig.setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//指定状态后端
//EmbeddedRocksDBStateBackend eocksDb状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend(true))
//将状态保存到hdfs中,在触发checkpoint的时候将状态持久化到hdfs中
env.getCheckpointConfig.setCheckpointStorage("hdfs://master:9000/flink/checkpoint")

2.2 在源码中修改配置开启(flink新版可以操作)

vim  flink-conf.yaml

execution.checkpointing.interval: 3min
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 0
state.backend: rocksdb
state.checkpoints.dir: hdfs://master:9000/flink/checkpoint

2.3 从checkpoint中恢复任务

1.可以在网页中指定checkpint的路径恢复,初次提交任务不需要指定路径,恢复任务时需要指定hdfs中保存状态的路径,路径需要带上前缀hdfs://master:9000
样式:hdfs://master:9000/flink/checkpoint/11edbec21742ceddebbb90f3e49f24b4/chk-35
2.在命令行中重新提交任务,指定恢复任务的位置:   -s  指定恢复任务的路径
flink run -t yarn-session -Dyarn.application.id=application_1658546198162_0005  -c com.shujia.flink.core.Demo15RocksDB -s hdfs://master:9000/flink/checkpoint/11edbec21742ceddebbb90f3e49f24b4/chk-35 flink-1.0.jar
原网站

版权声明
本文为[大学生爱编程]所创,转载请带上原文链接,感谢
https://blog.csdn.net/qq_45409791/article/details/126125711