当前位置:网站首页>聊聊flink的BoundedOutOfOrdernessTimestampExtractor
聊聊flink的BoundedOutOfOrdernessTimestampExtractor
2022-08-02 02:58:00 【SparkSql】
本文研究一下Flink中的BoundedOutOfOrdernessTimestampExtractor
- BoundedOutOfOrdernessTimestampExtractor抽象类实现AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同时声明抽象方法extractAscendingTimestamp供子类实现
- BoundedOutOfOrdernessTimestampExtractor的构造器接收maxOutOfOrderness参数用于指定element允许滞后(
t-t_w,t为element的eventTime,t_w为前一次watermark的时间)的最大时间,在计算窗口数据时,如果超过该值则会被忽略 - BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法会调用子类的extractTimestamp方法抽取时间,如果该时间大于currentMaxTimestamp,则更新currentMaxTimestamp;getCurrentWatermark先计算potentialWM,如果potentialWM大于等于lastEmittedWatermark则更新lastEmittedWatermark(
currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,这里表示lastEmittedWatermark太小了所以差值超过了maxOutOfOrderness,因而调大lastEmittedWatermark),最后返回Watermark(lastEmittedWatermark) public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> { private static final long serialVersionUID = 1L; /** The current maximum timestamp seen so far. */ //定义当前最大时间戳 private long currentMaxTimestamp; /** The timestamp of the last emitted watermark. */ //最后提交的时间戳 private long lastEmittedWatermark = Long.MIN_VALUE; /** * The (fixed) interval between the maximum seen timestamp seen in the records * and that of the watermark to be emitted. */ private final long maxOutOfOrderness; public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) { if (maxOutOfOrderness.toMilliseconds() < 0) { throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative."); } this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; } public long getMaxOutOfOrdernessInMillis() { return maxOutOfOrderness; } /** * Extracts the timestamp from the given element. * * @param element The element that the timestamp is extracted from. * @return The new timestamp. */ public abstract long extractTimestamp(T element); @Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. //这个句代码保证了生成的水印是单调递增的 //当前最大的时间戳减去延时时间和上次最后提交的水印时间比较 //保留最大的时间(减去延时时间)作为水印 long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } //提取数据中时间作为timestamp //如果timestamp 大于最大的currentMaxTimestamp 就把currentMaxTimestamp 置为 timestamp //返回当前提取到的timestamp @Override public final long extractTimestamp(T element, long previousElementTimestamp) { long timestamp = extractTimestamp(element); if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } }
边栏推荐
- MySQL8.0.28 installation tutorial
- Nacos源码分析专题(一)-环境准备
- (1) Redis: Key-Value based storage system
- Hit the programmer interview scene: What did Baidu interviewers ask me?
- Istio微服务治理网格的全方面可视化监控(微服务架构展示、资源监控、流量监控、链路监控)
- WebShell Feature Value Summary and Detection Tool
- 有人知道HTML怎么到MYSQL数据库吗? (NODEJS)
- 生成器知道鉴别器在无条件GANs中应该学习什么
- 就瞎写=感想
- MySQL中的存储过程(详细篇)
猜你喜欢
随机推荐
就瞎写=感想
7-36 社交网络图中结点的“重要性”计算 (30 分) 不用迪杰斯特拉也不用弗洛伊德
AcWing 1053. 修复DNA 题解(状态机DP、AC自动机)
给你一个大厂面试的机会,你能面试上吗?进来看看!
数仓:为什么说 ETL 的未来不是 ELT,而是 EL (T)
[LeetCode] 94. Inorder traversal of binary tree
PHP WebSehll 后门脚本与检测工具
“带薪划水”偷刷阿里老哥的面经宝典,三次挑战字节,终成正果
WebShell connection tools (Chinese kitchen knife, WeBaCoo, Weevely) use
JDBC--Druid数据库连接池以及Template基本用法
暴力破解全攻略
DVWA之SQL注入
常见的SQL面试题:经典50例
【LeetCode】206. Reverse linked list
Go语学习笔记 - gorm使用 - 原生sql、命名参数、Rows、ToSQL Web框架Gin(九)
MySQL8.0.26安装配置教程(windows 64位)
MySQL8 -- use msi (graphical user interface) under Windows installation method
【LeetCode】206.反转链表
Go语学习笔记 - gorm使用 - 表增删改查 Web框架Gin(八)
VPS8702 VPSC(源特科技)电源管理(PMIC) 封装SOT23-6








