当前位置:网站首页>flink onTimer定时器实现定时需求

flink onTimer定时器实现定时需求

2022-08-04 05:27:00 第一片心意

1. 业务需求

接收实时数据流数据,实时更新状态,并且每隔一定的时间,将所有状态数据输出。

实时数据类型:("张", 1)

状态更新:第一个元素为key,将第二个元素全部缓存起来,放到list中,最后将key和其对应的list全部输出。

2. 实现方案

使用processFunction算子,在processElement函数中仅注册一次定时器,然后在onTimer函数中处理定时器任务,并且重新注册定时器。

3. 实现代码

3.1 source

/**
 * 每隔1秒发送一个tuple2类型的数据,第一个字段值为随机的一个姓氏,第二个字段为自增的数字
 **/
class MySourceTuple2 extends SourceFunction[(String, Long)] {
  
  var isRunning: Boolean = true
  val names: List[String] = List("张", "王", "李", "赵")
  private val random = new Random()
  var number: Long = 1
  
  override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = {
    while (true) {
      val index: Int = random.nextInt(4)
      ctx.collect((names(index), number))
      number += 1
      Thread.sleep(1000)
    }
  }
  
  override def cancel(): Unit = {
    isRunning = false
  }
}

3.2 流处理

object TimerMain2 {
  
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env
        .addSource(new MySourceTuple2)
        .keyBy(_._1)
        .process(new KeyedProcessFunction[String, (String, Long), String] {
          
          //缓存流数据
          private val cache: mutable.Map[String, ListBuffer[Long]] = mutable.Map[String, ListBuffer[Long]]()
          private var first: Boolean = true
          
          /**
           * 定时器触发时回调该函数
           *
           * @param timestamp 定时器触发时间
           */
          override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Long), String]#OnTimerContext, out: Collector[String]): Unit = {
            println("定时器触发:" + timestamp)
            //将缓存中的数据组织成需要的格式
            val builder = new StringBuilder()
            for (entry: (String, ListBuffer[Long]) <- cache) {
              builder.append(entry._1).append(":")
              for (ele <- entry._2) {
                builder.append(ele).append(",")
              }
              builder.delete(builder.size - 1, builder.size).append(";")
              cache(entry._1).clear()
            }
            println("定时器注册:" + timestamp)
            //该定时器执行完任务之后,重新注册一个定时器
            ctx.timerService().registerProcessingTimeTimer(timestamp + 5000)
            out.collect(builder.toString())
          }
          
          /**
           * 处理每一个流数据
           */
          override def processElement(value: (String, Long), ctx: KeyedProcessFunction[String, (String, Long), String]#Context, out: Collector[String]): Unit = {
            //仅在该算子接收到第一个数据时,注册一个定时器
            if (first) {
              first = false
              val time: Long = System.currentTimeMillis()
              println("定时器第一次注册:" + time)
              ctx.timerService().registerProcessingTimeTimer(time + 5000)
            }
            //将流数据更新到缓存中
            if (cache.contains(value._1)) {
              cache(value._1).append(value._2)
            } else {
              cache.put(value._1, ListBuffer[Long](value._2))
            }
          }
        }
        )
        .print("处理结果:")
    env.execute()
  }
  
}

所有代码解释均在注释中。

4. 运行结果

可以看到,定时器注册之后,过5秒就会被触发,同时注册下个5秒的注册器,然后将数据发送到下个算子打印出来。

注意:该实例中算子并行度为1,所以“定时器第一次注册”只会触发一次,如果是多个并行度的话,则会在每个并行度里面进行“定时器第一次注册”,并且每个算子维护自己的定时器,算子之间互不影响。

原网站

版权声明
本文为[第一片心意]所创,转载请带上原文链接,感谢
https://blog.csdn.net/u012443641/article/details/105114869