当前位置:网站首页>Flink 系例 之 Watermarks
Flink 系例 之 Watermarks
2022-06-10 05:38:00 【不会飞的小龙人】
Watermarks 水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案
示例环境
java.version: 1.8.x
flink.version: 1.11.1示例数据源 (项目码云下载)
TimestampsAndWatermarks.java
import com.flink.examples.DataSource;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
/**
* @Description Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案
*/
public class TimestampsAndWatermarks {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
*/
/**
* 遍历集合,分别打印不同性别的信息,对于执行超时,自动触发定时器
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
TimeCharacteristic有三种时间类型:
ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间;
IngestionTime:以数据进入flink streaming data flow的时间为准;
EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段;需要实现assignTimestampsAndWatermarks方法,并设置时间水位线;
*/
//使用event time,需要指定事件的时间戳
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
//设置自动生成水印的时间周期,避免数据流量大的情况下,频繁添加水印导致计算性能降低。
env.getConfig().setAutoWatermarkInterval(1000L);
List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
DataStream<Tuple2<String, Integer>> dataStream = inStream
//为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。
//Duration.ofSeconds(2),到数据流到达flink后,再水位线中设置延迟时间,也就是在所有数据流的最大的事件时间比window窗口结束时间大或相等时,再延迟多久触发window窗口结束;
// .assignTimestampsAndWatermarks(
// WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
// .withTimestampAssigner((element, timestamp) -> {
// long times = System.currentTimeMillis() ;
// System.out.println(element.f1 + ","+ element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
// return times;
// })
// )
.assignTimestampsAndWatermarks(new MyWatermarkStrategy()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Integer>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Integer> element, long timestamp) {
long times = System.currentTimeMillis();
System.out.println(element.f1 + "," + element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
return times;
}
}))
//分区窗口
.keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
//触发3s滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//执行窗口数据,对keyBy数据流批量处理
.apply(new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>(){
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
long times = System.currentTimeMillis() ;
System.out.println();
System.out.println("窗口处理时间:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
Iterator<Tuple3<String, String, Integer>> iterator = input.iterator();
int total = 0;
int size = 0;
String sex = "";
while (iterator.hasNext()){
Tuple3<String, String, Integer> tuple3 = iterator.next();
total += tuple3.f2;
size ++;
sex = tuple3.f1;
}
out.collect(new Tuple2<>(sex, total / size));
}
});
dataStream.print();
env.execute("flink Filter job");
}
/**
* 定期水印生成器
*/
public static class MyWatermarkStrategy implements WatermarkStrategy<Tuple3<String, String, Integer>>{
@Override
public WatermarkGenerator<Tuple3<String, String, Integer>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Tuple3<String, String, Integer>>() {
//设置固定的延迟量3.5 seconds
private final long maxOutOfOrderness = 3500;
private long currentMaxTimestamp;
/**
* 事件处理
* @param event 数据流对象
* @param eventTimestamp 事件水位线时间
* @param output 输出
*/
@Override
public void onEvent(Tuple3<String, String, Integer> event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 拿上一个水印时间 - 延迟量 = 等于给的窗口最终数据最后时间(如果在窗口到期内,未发生新的水印事件,则按window正常结束时间计算,当在最后水印时间-延迟量的时间范围内,有新的数据流进入,则会重新触发窗口内对全部数据流计算)
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
};
}
}
/**
* 模拟数据持续输出
*/
public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> {
@Override
public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
int j = 0;
for (int i=0;i<100;i++){
if (i%6 == 0){
j=0;
}
ctx.collect(tuple3List.get(j));
//1秒钟输出一个
Thread.sleep(1 * 1000);
j ++;
}
}
@Override
public void cancel() {
try{
super.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
打印结果
man,张三的水位线为:2020-12-27 10:28:20
girl,李四的水位线为:2020-12-27 10:28:21
man,王五的水位线为:2020-12-27 10:28:22
girl,刘六的水位线为:2020-12-27 10:28:23
girl,伍七的水位线为:2020-12-27 10:28:24
窗口处理时间:2020-12-27 10:28:25
(man,20)
man,吴八的水位线为:2020-12-27 10:28:25
man,张三的水位线为:2020-12-27 10:28:26
girl,李四的水位线为:2020-12-27 10:28:27
窗口处理时间:2020-12-27 10:28:28
(girl,28)
窗口处理时间:2020-12-27 10:28:28
(man,29)参考:
Watermark 水印介绍:https://blog.csdn.net/hlp4207/article/details/90698296
官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
边栏推荐
- An analysis of DuPont analysis: the financial situation of East China Construction Group Co., Ltd
- [how much management knows] Chinese "other", English "other", Japanese "other"
- Curator - Create Client
- Three technical solutions of ant group were selected as "typical solutions for it application innovation in 2021"
- These six open source protocols (GPL, LGPL, BSD, MIT, APACHE)
- mdadm: cannot open /dev/sda1: Device or resource busy
- 如何保障系统稳定性并实现减排?蚂蚁集团有这些关键技术
- Complex nested object pool (2) -- manage the object pool of a single instance object
- CANape XCP on CAN工程创建
- The R language uses the matchit package for propensity matching analysis and match The data function constructs the matched sample set, and judges the balance of all covariates in the sample after pro
猜你喜欢
![[STM32] migration of guilite based on Hal Library](/img/19/35bdd4853f4b5981f680360f90ea12.png)
[STM32] migration of guilite based on Hal Library

【对话直播】图计算是下一个科技前沿

MYSQL第二篇(核心技术)

Improper use of BigDecimal caused P0 accident!

Flutter file operation

使用GAT解析Minidump(图形界面)

MySQL Part 2 (core technology)

Some operations on mat class in QT

树莓派4B编译内核模块

Three technical solutions of ant group were selected as "typical solutions for it application innovation in 2021"
随机推荐
Five best! Ant group passed the highest level evaluation of the "stability Assurance Plan" of the ICT Institute
Using kubekey to build kubernetes/kubesphere environment“
4. [prime phrase, leftmost prime phrase]
Buuctf hardsql[geek challenge 2019]
IDC released China Cloud native market analysis. Ant group has become one of the most comprehensive manufacturers
最高奖项!2022数博会领先科技成果“新技术”授予OceanBase数据库
C#为应付期末涉及到大部分考点所设计的学生管理系统
蚂蚁集团隐私计算一体机获得双认证,83项指标均达要求
浏览器安装这款插件瞬间漂亮多了,有类似苹果手机一样的自定义组件功能
冒泡排序bubble_sort
MySQL在InnoDB引擎下使用where in 导致锁升级
Best practice | zero basis implementation of applet voice input method
How do you do if your English is too poor? Write a "100 word chop" software to recite words for yourself
[STM32] transplantation of Hal library on 4-pin 0.96 inch OLED screen - hardware IIC (I)
Focus on web development, simple web development
MySQL-进阶CRUD
Generate boot_ para. img
Api 接口优化的几个技巧
Mill embedded CPU module appears in industrial control technology seminar
Several skills of API interface optimization