当前位置:网站首页>聊聊flink的BoundedOutOfOrdernessTimestampExtractor
聊聊flink的BoundedOutOfOrdernessTimestampExtractor
2022-08-02 02:58:00 【SparkSql】
本文研究一下Flink中的BoundedOutOfOrdernessTimestampExtractor
- BoundedOutOfOrdernessTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现
- BoundedOutOfOrdernessTimestampExtractor的构造器接收maxOutOfOrderness参数用于指定element允许滞后(
t-t_w,t为element的eventTime,t_w为前一次watermark的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略 - BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法会调用子类的extractTimestamp方法抽取时间,如果该时间大于currentMaxTimestamp,则更新currentMaxTimestamp;getCurrentWatermark先计算potentialWM,如果potentialWM大于等于lastEmittedWatermark则更新lastEmittedWatermark(
currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,这里表示lastEmittedWatermark太小了所以差值超过了maxOutOfOrderness,因而调大lastEmittedWatermark),最后返回Watermark(lastEmittedWatermark) public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> { private static final long serialVersionUID = 1L; /** The current maximum timestamp seen so far. */ //定义当前最大时间戳 private long currentMaxTimestamp; /** The timestamp of the last emitted watermark. */ //最后提交的时间戳 private long lastEmittedWatermark = Long.MIN_VALUE; /** * The (fixed) interval between the maximum seen timestamp seen in the records * and that of the watermark to be emitted. */ private final long maxOutOfOrderness; public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) { if (maxOutOfOrderness.toMilliseconds() < 0) { throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative."); } this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; } public long getMaxOutOfOrdernessInMillis() { return maxOutOfOrderness; } /** * Extracts the timestamp from the given element. * * @param element The element that the timestamp is extracted from. * @return The new timestamp. */ public abstract long extractTimestamp(T element); @Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. //这个句代码保证了生成的水印是单调递增的 //当前最大的时间戳减去延时时间和上次最后提交的水印时间比较 //保留最大的时间(减去延时时间)作为水印 long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } //提取数据中时间作为timestamp //如果timestamp 大于最大的currentMaxTimestamp 就把currentMaxTimestamp 置为 timestamp //返回当前提取到的timestamp @Override public final long extractTimestamp(T element, long previousElementTimestamp) { long timestamp = extractTimestamp(element); if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } }
边栏推荐
- Go语学习笔记 - gorm使用 - 原生sql、命名参数、Rows、ToSQL Web框架Gin(九)
- mysql8.0.28 download and installation detailed tutorial, suitable for win11
- [Daily LeetCode]——1. The sum of two numbers
- MySQL八股文背诵版
- I will give you a chance to interview in a big factory. Can you interview?Come in and see!
- Go语学习笔记 - gorm使用 - 表增删改查 Web框架Gin(八)
- ASP WebShell 后门脚本与免杀
- JSP Webshell free kill
- VPS8505 微功率隔离电源隔离芯片 2.3-6V IN /24V/1A 功率管
- svm.SVC application practice 1--Breast cancer detection
猜你喜欢
随机推荐
消息队列经典十连问
【LeetCode】206. Reverse linked list
指针数组和数组指针
OperatingSystemMXBean获取系统性能指标
PHP WebShell 免杀
(一)Redis: 基于 Key-Value 的存储系统
Webshell上传方式
Hit the programmer interview scene: What did Baidu interviewers ask me?
合奥科技网络 面试(含参考答案)
analog IC layout
第二章——堆栈、队列
ASP WebShell backdoor script and anti-kill
【LeetCode】1374. 生成每种字符都是奇数个的字符串
VPS8504C 微功率隔离电源隔离芯片 VPSC源特科技
7-42 整型关键字的散列映射 (25 分)
给你一个大厂面试的机会,你能面试上吗?进来看看!
"Paid paddling" stealthily brushes Brother Ali's face scriptures, challenges bytes three times, and finally achieves positive results
- daily a LeetCode 】 【 9. Palindrome
Lua安装及常用命令使用
MySQL中的时区设置








