当前位置:网站首页>聊聊flink的BoundedOutOfOrdernessTimestampExtractor
聊聊flink的BoundedOutOfOrdernessTimestampExtractor
2022-08-02 02:58:00 【SparkSql】
本文研究一下Flink中的BoundedOutOfOrdernessTimestampExtractor
- BoundedOutOfOrdernessTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现
- BoundedOutOfOrdernessTimestampExtractor的构造器接收maxOutOfOrderness参数用于指定element允许滞后(
t-t_w,t为element的eventTime,t_w为前一次watermark的时间
)的最大时间,在计算窗口数据时,如果超过该值则会被忽略 - BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法会调用子类的extractTimestamp方法抽取时间,如果该时间大于currentMaxTimestamp,则更新currentMaxTimestamp;getCurrentWatermark先计算potentialWM,如果potentialWM大于等于lastEmittedWatermark则更新lastEmittedWatermark(
currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,这里表示lastEmittedWatermark太小了所以差值超过了maxOutOfOrderness,因而调大lastEmittedWatermark
),最后返回Watermark(lastEmittedWatermark) public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> { private static final long serialVersionUID = 1L; /** The current maximum timestamp seen so far. */ //定义当前最大时间戳 private long currentMaxTimestamp; /** The timestamp of the last emitted watermark. */ //最后提交的时间戳 private long lastEmittedWatermark = Long.MIN_VALUE; /** * The (fixed) interval between the maximum seen timestamp seen in the records * and that of the watermark to be emitted. */ private final long maxOutOfOrderness; public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) { if (maxOutOfOrderness.toMilliseconds() < 0) { throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative."); } this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; } public long getMaxOutOfOrdernessInMillis() { return maxOutOfOrderness; } /** * Extracts the timestamp from the given element. * * @param element The element that the timestamp is extracted from. * @return The new timestamp. */ public abstract long extractTimestamp(T element); @Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. //这个句代码保证了生成的水印是单调递增的 //当前最大的时间戳减去延时时间和上次最后提交的水印时间比较 //保留最大的时间(减去延时时间)作为水印 long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } //提取数据中时间作为timestamp //如果timestamp 大于最大的currentMaxTimestamp 就把currentMaxTimestamp 置为 timestamp //返回当前提取到的timestamp @Override public final long extractTimestamp(T element, long previousElementTimestamp) { long timestamp = extractTimestamp(element); if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } }
边栏推荐
猜你喜欢
随机推荐
Common SQL interview questions: 50 classic examples
直击程序员面试现场:百度面试官都问了我些啥?
iVX低代码平台系列详解 -- 概述篇(二)
[Daily LeetCode]——1. The sum of two numbers
PyTorch(六)——PyTorch可视化
架构:应用架构的演进以及微服务架构的落地实践
咨询cdc for oracle,增量同步scan.startup.mode只有initial和la
总体写作原则
7-41 PAT排名汇总 (25 分)多样排序
WebShell Feature Value Summary and Detection Tool
DVWA安装教程(懂你的不懂·详细)
启发式合并、DSU on Tree
(1) Redis: Key-Value based storage system
MySQL8--Windows下使用msi(图形界面)安装的方法
7-40 奥运排行榜 (25 分)多项排序
Nacos源码分析专题(一)-环境准备
MySQL函数(经典收藏)
消息队列经典十连问
就瞎写=感想
常见的SQL面试题:经典50例