当前位置:网站首页>Flink / Scala - 使用 CountWindow 实现按条数触发窗口
Flink / Scala - 使用 CountWindow 实现按条数触发窗口
2022-08-02 23:45:00 【BIT_666】
一.引言
CountWindow 数量窗口分为滑动窗口与滚动窗口,类似于之前 TimeWindow 的滚动时间与滑动时间,这里滚动窗口不存在元素重复而滑动窗口存在元素重复的情况,下面 demo 场景为非重复场景,所以将采用滚动窗口。
二.CountWindow 简介
这里最关键的一句话是: A Window that represents a count window. For each count window, we will assign a unique id. Thus this CountWindow can act as namespace part in state. We can attach data to each different CountWindow. 翻译的意思是:表示计数窗口的窗口。对于每个计数窗口,我们将分配一个唯一的 id。因此,此计数窗口可以作为状态中的命名空间部分。我们可以将数据附加到每个不同的 CountWindow。
countWindow 共分为两种初始化方式,其中只添加 size 为滚动窗口,即不重复元素的 CountWindow,带 slide 滑动参数则生成滑动窗口,不在本文讨论范围之内。上面提到了对于每个计数的窗口,我们分配一个唯一的 id,这个 id 可以近似看做是以 keyBy 为依据进行数据分流,下面示例将给出解答。
三.任务场景与实现
通过自定义 Source 实现无限数据流,我们希望指定 count = N,使得每 N 个数据作为一个 batch 触发一次窗口并计算。
1.自定义 Source
case class info(num: Int, id: Int)
自定义 Source 需要继承 RichSourceFunction,这里我们生成 info 类数据结构,其内部包含两个元素 num 与 id,num 代表其内部的数字信息,id 代表其 keyBy 的分组信息:
class SourceFromCollection extends RichSourceFunction[info] {
private var isRunning = true
var start = 0
override def run(ctx: SourceFunction.SourceContext[info]): Unit = {
val random = new Random() // 生成随机 id
while (isRunning) {
(start until (start + 100)).foreach(num => {
ctx.collect(info(num, random.nextInt(4))) // 随机id范围 0,1,2,3
if (num % 20 == 0) { // 每生产20个数据 sleep 1s
TimeUnit.SECONDS.sleep(1)
}
})
start += 100 // 数值累加
}
}
override def cancel(): Unit = {
isRunning = false
}
}
Source 函数每 s 生成 20 个数字和其对应的 randomId,最终输出 info(num, id) 类。
2.执行流程
主函数主要基于 Source 实现 ProcessWindowFunction,并制定 count=20,每20个元素触发一次窗口计算:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = env.addSource(new SourceFromCollection) // 添加 Source
dataStream
.keyBy(info => info.id) // 按 random 生成的 id 分配到不同 CountWindow
.countWindow(20) // size = 20 的滚动窗口
.process(new ProcessWindowFunction[info, String, Int, GlobalWindow] {
override def process(key: Int, context: Context, elements: Iterable[info], out: Collector[String]): Unit = {
// 输出 id + size + 元素
val log = key + "\t" + elements.size + "\t" + elements.toArray.map(_.num).mkString("|")
out.collect(log)
}
}).print()
env.execute()
其中 ProcessWindowFunction 共有四个参数:
IN: 输入类型,本例数据流为 DataStream[info],所以 classOf[In] 为 info
OUT: 输出类型,与 Collector 后类型一致,这里输出类型为 String
KEY: Key 即为 keyBy 的 id,这里 key 为 random.nextInt,所以为 int 类型
W: org.apache.flink.streaming.api.windowing.windows.window,这里主要分为两类,如果采用时间窗口即 TimeWindow,则对应类型为 TimeWindow,本例中采用 CountWindow,则对应类型为 GlobalWindow。
3.运行结果
批处理每一批 size = 20,分别输出 id + "\t" + size + "\t" + 批次数据,可以看到每一批触发20条数据,且 id 分别为 0,1,2,3,这里还是根据 process 的并行度来定,如果 process 的并行度设定为2,则很大概率 0,1,2,3 均分至两台 TaskManager 上,如果规定并行度为4,则分别分配到单台 TaskManager 上,也可以根据数据的吞吐,修改并行度与 keyBy 时 nextInt 的范围。
四.CountTrigger
1.自定义 CountTrigger
之前提到过 CountTrigger 也可以实现按 count 数目进行窗口触发,但是有一点不同是 CountTrigger 不会清除窗口内元素,所以多次执行逻辑会重复处理一批数据,具体实现逻辑解析可以参考: Flink - CountTrigger && ProcessingTimeTriger 详解。
class SelfDefinedCountTrigger(maxCount: Long) extends Trigger[String, TimeWindow] {
// 条数计数器
val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long])
// 元素过来后执行的操作
override def onElement(t: String, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
// 获取 count state 并累加数量
val count = triggerContext.getPartitionedState(countStateDesc)
count.add(1L)
// 满足数量触发要求
if (count.get() >= maxCount) {
// 首先清空计数器
count.clear()
val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss")
val cla = Calendar.getInstance()
cla.setTimeInMillis(System.currentTimeMillis())
val date = dateFormat.format(cla.getTime)
println(s"[$date] Window Trigger By Count = ${maxCount}")
TriggerResult.FIRE
} else {
TriggerResult.CONTINUE
}
}
override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {
val count = triggerContext.getPartitionedState(countStateDesc)
count.clear()
}
}
2.运行结果
之前介绍 Window 触发 CountTrigger 时也实现了基于 Count 的窗口触发机制,但是存在一个问题,CountTrigger 每次达到 count 数量触发,但是不会清除窗口数据,即窗口数据累加同时多次触发窗口:
如上,窗口逻辑为计算批次数据的最大最小值,同一个颜色框内为 count=30 多次触发的结果,可以看到 min 一直为同一个数字,max 持续增大,这就是上面提到的问题: 使用 countTrigger 时会造成窗口数据重复触发,所以想要实现无重复 CountWindow 就得最上面的 countWindow 实现。当然上面的执行逻辑也并不是没有使用场景,例如电商平台统计一段时间内商品销售的情况就可以使用 CountTrigger,实时滚动大屏数据展示。
五.CountWindow 完整代码
下面附上完整代码:
package com.weibo.ug.push.flink.Main
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
import org.apache.flink.util.Collector
import java.util.concurrent.TimeUnit
import com.weibo.ug.push.flink.Main.TestCountWindow.info
import scala.util.Random
object TestCountWindow {
case class info(num: Int, id: Int)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = env.addSource(new SourceFromCollection)
dataStream
.keyBy(x => x.id)
.countWindow(20)
.process(new ProcessWindowFunction[info, String, Int, GlobalWindow] {
override def process(key: Int, context: Context, elements: Iterable[info], out: Collector[String]): Unit = {
val log = key + "\t" + elements.size + "\t" + elements.toArray.map(_.num).mkString("|")
out.collect(log)
}
}).print()
env.execute()
}
}
class SourceFromCollection extends RichSourceFunction[info] {
private var isRunning = true
var start = 0
override def run(ctx: SourceFunction.SourceContext[info]): Unit = {
val random = new Random()
while (isRunning) {
(start until (start + 100)).foreach(num => {
ctx.collect(info(num, random.nextInt(4)))
if (num % 20 == 0) {
TimeUnit.SECONDS.sleep(1)
}
})
start += 100
}
}
override def cancel(): Unit = {
isRunning = false
}
}
边栏推荐
- CIO修炼手册:成功晋升CIO的七个秘诀
- js基础知识整理之 —— 字符串
- 21天学习挑战赛(1)设备树的由来
- Numpy数组中d[True]=1的含义
- 典型相关分析CCA计算过程
- 精心整理16条MySQL使用规范,减少80%问题,推荐分享给团队
- ORA-55610: Invalid DDL statement on history-tracked table
- # DWD层及DIM层构建## ,220801 ,
- 2022 Shandong International Youth Eye Health Industry Exhibition, Vision Health Exhibition, Optometry Exhibition
- 即席查询—— Kylin使用
猜你喜欢
js基础知识整理之 —— 五种输出方式
用了 TCP 协议,数据一定不会丢吗?
Database auditing - an essential part of network security
Merge two excel spreadsheet tools
Mock工具之Moco使用教程
九零后程序员心声:互联网的同行们,别卷了,再卷人都卷没了
Nuxt 所有页面都设置上SEO相关标签
即席查询—— Kylin使用
Day117.尚医通:生成挂号订单模块
What is the matter that programmers often say "the left hand is knuckled and the right hand is hot"?
随机推荐
Jenkins汉化设置
2022 China Eye Expo, Shandong Eye Health Exhibition, Vision Correction Instrument Exhibition, Eye Care Products Exhibition
我为什么又能面试一次就拿到offer
js基础知识整理之 —— 闭包
D with json
js基础知识整理之 —— 五种输出方式
LVM与磁盘配额原理及配置
@GetMapping、@PostMapping、@PutMapping、@DeleteMapping的区别
程序员英语自我介绍
十二、form表单的提交
Canonical correlation analysis of CCA calculation process
优秀论文以及思路分析01
基于飞腾平台的嵌入式解决方案案例集 1.0 正式发布!
如何使用vlookup+excel数组公式 完成逆向查找?
基于rt-thread studio的STM32裸机开发——LED
十三、数据回显
Introduction to resubmit Progressive Anti-Duplicate Submission Framework
高数---二重积分
Apache Doris 1.1 特性揭秘:Flink 实时写入如何兼顾高吞吐和低延时
VMware workstation program starts slowly