当前位置:网站首页>flink 控制窗口行为(触发器、移除器、允许延迟、将迟到的数据放入侧输出流)
flink 控制窗口行为(触发器、移除器、允许延迟、将迟到的数据放入侧输出流)
2022-06-11 12:06:00 【但行益事莫问前程】
1. 触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算,执行窗口函数。基于 WindowedStream 用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:
------onElement():窗口中每到来一个元素,都会调用这个方法
------onEventTime():当注册的事件时间定时器触发时,将调用这个方法
------onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法
------clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态
这几个方法的参数中都有一个触发器上下文(TriggerContext)对象,可以用来注册定时器回调(callback)。定时器(Timer)代表未来某个时间点会执行的事件;当时间进展到设定的值时,就会执行定义好的操作
除clear外,三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型:
------CONTINUE(继续):什么都不做
------FIRE(触发):触发计算,输出结果
------PURGE(清除):清空窗口中的所有数据,销毁窗口
------FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。如:所有事件时间窗口,默认的触发器都是 EventTimeTrigger;
源码:
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {
}
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) {
// only register a timer if the watermark is not yet past the end of the merged window
// this is in line with the logic in onElement(). If the watermark is past the end of
// the window onElement() will fire and setting a timer here would fire the window twice.
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
/** * Creates an event-time trigger that fires once the watermark passes the end of the window. * * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately * trigger window evaluation with just this one element. */
public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}
实际场景:
每个 url 在 10 秒滚动窗口的 pv 指标,然后设置了触发器,每隔 5 秒钟触发一次窗口的计算:
public class TriggerExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> streamSource = env.addSource(new ClickSource());
SingleOutputStreamOperator<Event> watermarks = streamSource.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
})
);
SingleOutputStreamOperator<UrlViewCount> process = watermarks.keyBy(r -> r.url)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.trigger(new MyTrigger())
.process(new WindowResult());
process.print();
env.execute();
}
public static class WindowResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow> {
@Override
public void process(String s, Context context, Iterable<Event> iterable, Collector<UrlViewCount> collector) throws Exception {
collector.collect(new UrlViewCount(
s,
// 获取迭代器中的元素个数
iterable.spliterator().getExactSizeIfKnown(),
context.window().getStart(),
context.window().getEnd()
)
);
}
}
public static class MyTrigger extends Trigger<Event, TimeWindow> {
@Override
public TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
if (isFirstEvent.value() == null) {
for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 5000L) {
triggerContext.registerEventTimeTimer(i);
}
isFirstEvent.update(true);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
isFirstEvent.clear();
}
}
}

结果如图所示,每个用户在10秒的窗口内,触发了3次计算
2. 移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器
stream.keyBy(...)
.window(...)
.evictor(new MyEvictor())
Evictor 接口定义了两个方法:evictBefore():定义执行窗口函数之前的移除数据操作; evictAfter():定义执行窗口函数之后的以处数据操作默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的
3. 允许延迟(Allowed Lateness)
当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个允许的最大延迟(Allowed Lateness)。在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(1))
4. 将迟到的数据放入侧输出流
基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现将未收入窗口的迟到数据,放入侧输出流(side output)进行另外的处理。方法需要传入一个输出标签(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同
DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {
};
stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
将迟到数据放入侧输出流之后,基于窗口处理完成之后的DataStream,用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流
SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1))) .sideOutputLateData(outputTag)
.aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);
边栏推荐
- 纯数据业务的机器打电话进来时回落到了2G/3G
- Notes on topic brushing (XIV) -- binary tree: sequence traversal and DFS, BFS
- [第二章 基因和染色体的关系]生物知识概括–高一生物
- log4j-slf4j-impl cannot be present with log4j-to-slf4j
- SQLServer连接数据库(中文表)部分数据乱码问题解决
- arguments.callee 实现函数递归调用
- Using fast and slow pointer method to solve the problem of array (C language)
- Use compiler option ‘--downlevelIteration‘ to allow iterating of iterators 报错解决
- Elk - elastalert largest pit
- Wechat web developers, how to learn web development
猜你喜欢

Read geo expression matrix

CVPR 2022 𞓜 text guided entity level image manipulation manitrans

yapi安装

Let you understand bubble sorting (C language)

Generate statement is not synthesized

Intermediate web development engineer, interview questions + Notes + project practice

Uncaught typeerror: cannot set property 'next' of undefined

Splunk 健康检查之关闭THP

How does Sister Feng change to ice?

Solve the problem of swagger document interface 404
随机推荐
Uncaught typeerror: cannot set property 'next' of undefined
Take you to know about direct insertion sorting (C language)
Using fast and slow pointer method to solve the problem of array (C language)
中国网络安全年会周鸿祎发言:360安全大脑构筑数据安全体系
安全工程师发现PS主机重大漏洞 用光盘能在系统中执行任意代码
(推荐)splunk 多少数量search head 才合适
爱可可AI前沿推介(6.11)
Merge two ordered arrays (C language)
2020-07 学习笔记整理
Live source code, floating window rolling gradient effect
Gestion de projets logiciels 7.1. Concept de base du calendrier du projet
Live app development to determine whether the user is logging in to the platform for the first time
Read geo expression matrix
ObjectInputStream读取文件对象ObjectOutputStream写入文件对象
arguments.callee 实现函数递归调用
centos安装mysql5.7
Network protocol of yyds dry goods inventory: datagram socket for detailed explanation of socket protocol
軟件項目管理 7.1.項目進度基本概念
一般运维架构图
大数相加(C语言)