当前位置:网站首页>Flink高级API(三)
Flink高级API(三)
2022-07-22 22:28:00 【华为云】
day03_Flink高级API
今日目标
- Flink的四大基石
- Flink窗口Window操作
- Flink时间Time
- Flink水印Watermark机制
- Flink的state状态管理-keyed state 和 operator state
Flink的四大基石
- Checkpoint 分布式一致性,解决数据丢失,故障恢复数据
- State 状态,分为Keyed State ,Operator State; 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
- Time , EventTime事件时间、Ingestion摄取时间、Process处理时间
- Window窗口,TimeWindow 、 countwindow、 sessionwindow
Window操作
Window分类
- time
- 用的比较多 滚动窗口和滑动窗口
- count
如何使用

案例
- 需求
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */分析

代码
import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */public class WindowDemo { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 滚动窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .window(TumblingProcessingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) .sum("count"); //统计 滑动窗口 DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5))) .sum("count"); //4.打印输出 result1.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 }}需求2 - countwindow
需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口
需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口
代码
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 */public class WindowDemo02 { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 滚动计数窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5条数据统计一次 .countWindow(5) .sum("count"); //统计 滑动计数窗口 DataStream<CartInfo> result1 = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5秒钟统计一次,最近5秒钟内 .countWindow(10,5) .sum("count"); //4.打印输出 //result.printToErr(); result1.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 }}统计会话指定时间内的数据,如果这个窗口内没有数据,就不在计算,设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算。
案例 - 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

代码
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算 */public class WindowDemo03 { public static void main(String[] args) throws Exception { //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.setParallelism(1); //2.获取数据源 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //3.转换操作 基于key window 统计 DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() { @Override public CartInfo map(String value) throws Exception { String[] split = value.split(","); return new CartInfo(split[0], Integer.parseInt(split[1])); } }); //统计 会话窗口 DataStream<CartInfo> result = cartInfoDS .keyBy(t -> t.getSensorId()) //使用的是处理时间 //每5条数据统计一次 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) .sum("count"); //4.打印输出 //result.printToErr(); result.print(); //5.执行流环境 env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class CartInfo { private String sensorId;//信号灯id private Integer count;//通过该信号灯的车的数量 }}
Time 时间
- EventTime的重要性
- 防止出现网络抖动,造成数据的乱序,数据统计的丢失
- 窗口: 开始时间-结束时间
watermark 水印时间
watermark 水印机制
- watermark 就是时间戳
- watermark = eventTime - maxDelayTime
触发计算 watermak >= 结束时间
watermark 案例
需求
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
基础版:
package cn.itcast.sz22.day03;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */public class WatermarkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); // 添加水印机制 最大允许延迟的时间为 3 秒 //orderDS.printToErr(); //分配水印机制 SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy //指定最大的延迟时间 .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //指定 eventTime 是哪个字段 long extractTimestamp(T element, long recordTimestamp); .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime())) //统计每个用户对应 购买 金额 .keyBy(t -> t.getUserId()) //指定窗口,每5秒钟统计5秒钟之内的数据 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money"); sum.print(); // env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; }}扩展版:
package cn.itcast.sz22.day03;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.commons.lang3.time.FastDateFormat;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */public class WatermarkDemo02 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); DataStream<Order> WatermarkDS = orderDS .assignTimestampsAndWatermarks( new WatermarkStrategy<Order>() { @Override public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Order>() { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss"); private int userId = 0; private long eventTime = 0L; private final long outOfOrdernessMillis = 3000; private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; @Override public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) { userId = event.userId; eventTime = event.eventTime; maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { //Watermark = 当前最大事件时间 - 最大允许的延迟时间或乱序时间 时间戳 Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1); System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp())); output.emitWatermark(watermark); } }; } }.withTimestampAssigner((event, timestamp) -> event.getEventTime()) ); //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了 //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额 /* DataStream<Order> result = WatermarkDS .keyBy(Order::getUserId) //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money");*/ //开发中使用上面的代码进行业务计算即可 //学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermark时间 DataStream<String> result = WatermarkDS .keyBy(Order::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(5))) //把apply中的函数应用在窗口中的数据上 //WindowFunction<IN, OUT, KEY, W extends Window> .apply(new WindowFunction<Order, String, Integer, TimeWindow>() { FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss"); @Override public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception { //准备一个集合用来存放属于该窗口的数据的事件时间 List<String> eventTimeList = new ArrayList<>(); for (Order order : input) { Long eventTime = order.eventTime; eventTimeList.add(df.format(eventTime)); } String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s", key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList); out.collect(outStr); } }); // 添加水印机制 最大允许延迟的时间为 3 秒 //orderDS.printToErr(); result.printToErr(); env.execute(); //分配水印机制 } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; }}
Allowed lateness
案例
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s,计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
package cn.itcast.sz22.day03;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.OutputTag;import java.time.Duration;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/5/7 14:51 * 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) * 要求每隔5s,计算5秒内,每个用户的订单总金额 * 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。 */public class WatermarkDemo03 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source //模拟实时订单数据(数据有延迟和乱序) DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() { private boolean flag = true; @Override public void run(SourceContext<Order> ctx) throws Exception { Random random = new Random(); while (flag) { String orderId = UUID.randomUUID().toString(); int userId = random.nextInt(3); int money = random.nextInt(100); //模拟数据延迟和乱序! long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000; ctx.collect(new Order(orderId, userId, money, eventTime)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { flag = false; } }); OutputTag<Order> oot = new OutputTag<Order>("maxDelayOrder", TypeInformation.of(Order.class)); //分配水印机制 eventTime 默认使用 maxDelay 3秒 SingleOutputStreamOperator<Order> result = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime())) .keyBy(t -> t.getUserId()) //窗口设置 每隔5s,计算5秒内 .window(TumblingEventTimeWindows.of(Time.seconds(5))) //实例化侧输出流 主要用于晚于最大延迟 3 秒的数据 .allowedLateness(Time.seconds(3)) .sideOutputLateData(oot) //统计 .sum("money"); result.print("正常数据"); result.getSideOutput(oot).print("严重迟到的数据"); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order { private String orderId; private Integer userId; private Integer money; private Long eventTime; }}边栏推荐
猜你喜欢

【读书笔记->统计学】12-01 置信区间的构建-置信区间概念简介

如何用C语言实现简单职工信息管理系统

matlab声音信号处理 频率图 信号过滤和播放声音

算法面试高频题解指南【一】

Live broadcast preview | live broadcast Seminar on open source security governance models and tools

使用路由协议配置 IPv6 over IP手动隧道

93.(leaflet篇)leaflet态势标绘-进攻方向修改

Experiment 4 DPCM

Learn these SketchUp skills and improve work efficiency by half

Internet traffic scheduling scheme
随机推荐
matlab 分数阶pid控制
Networkx visualizes graphs
敏捷测试团队组织构成
Learn these SketchUp skills and improve work efficiency by half
Worthington纯化酶制剂助力新生儿心肌细胞分离系统研究
postgresql数据库主从部署 主库挂了重新还原主库
How to use C language to realize simple employee information management system
Spark疑难杂症排查-Premature EOF: no length prefix available
networkx对图进行可视化
TensorRT的插件实战(1)
General, special and hidden attributes of the file (instance generates animation)
pip更新一个package
c语言扫雷
Storage structure and method of graph (I)
RN underlying principle -- 1. Component and purecomponent analysis
实验七 H.264文件分析
读书笔记->统计学】12-02 置信区间的构建-t分布概念简介
y74.第四章 Prometheus大厂监控体系及实战 -- PromQL简介和监控pod资源(五)
实验四 DPCM
The author believes that the development logic of the meta universe and the Internet is quite different in Chengdu
