当前位置:网站首页>Flink高级API(三)

Flink高级API(三)

2022-07-22 22:28:00 华为云

day03_Flink高级API

今日目标

  • Flink的四大基石
  • Flink窗口Window操作
  • Flink时间Time
  • Flink水印Watermark机制
  • Flink的state状态管理-keyed state 和 operator state

Flink的四大基石

  • Checkpoint 分布式一致性,解决数据丢失,故障恢复数据
  • State 状态,分为Keyed State ,Operator State; 数据结构的角度来说 ValueState、ListState、MapState,BroadcastState
  • Time , EventTime事件时间、Ingestion摄取时间、Process处理时间
  • Window窗口,TimeWindow 、 countwindow、 sessionwindow

Window操作

Window分类

  • time
    • 用的比较多 滚动窗口和滑动窗口
  • count

如何使用

image-20210507090957187

案例

  • 需求
/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */
  • 分析

    image-20210507092308268

  • 代码

    import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 * 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 * 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 */public class WindowDemo {    public static void main(String[] args) throws Exception {        //1.创建流环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);        env.setParallelism(1);        //2.获取数据源        DataStreamSource<String> source = env.socketTextStream("node1", 9999);        //3.转换操作 基于key window 统计        DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {            @Override            public CartInfo map(String value) throws Exception {                String[] split = value.split(",");                return new CartInfo(split[0], Integer.parseInt(split[1]));            }        });        //统计 滚动窗口        DataStream<CartInfo> result = cartInfoDS                .keyBy(t -> t.getSensorId())                //使用的是处理时间                //每5秒钟统计一次,最近5秒钟内                .window(TumblingProcessingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))                .sum("count");        //统计 滑动窗口        DataStream<CartInfo> result1 = cartInfoDS                .keyBy(t -> t.getSensorId())                //使用的是处理时间                //每5秒钟统计一次,最近5秒钟内                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))                .sum("count");        //4.打印输出        result1.print();        //5.执行流环境        env.execute();    }    @Data    @AllArgsConstructor    @NoArgsConstructor    public static class CartInfo {        private String sensorId;//信号灯id        private Integer count;//通过该信号灯的车的数量    }}
  • 需求2 - countwindow

    需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计–基于数量的滚动窗口

    需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计–基于数量的滑动窗口

  • 代码

    /** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 */public class WindowDemo02 {    public static void main(String[] args) throws Exception {        //1.创建流环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);        env.setParallelism(1);        //2.获取数据源        DataStreamSource<String> source = env.socketTextStream("node1", 9999);        //3.转换操作 基于key window 统计        DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {            @Override            public CartInfo map(String value) throws Exception {                String[] split = value.split(",");                return new CartInfo(split[0], Integer.parseInt(split[1]));            }        });        //统计 滚动计数窗口        DataStream<CartInfo> result = cartInfoDS                .keyBy(t -> t.getSensorId())                //使用的是处理时间                //每5条数据统计一次                .countWindow(5)                .sum("count");        //统计 滑动计数窗口        DataStream<CartInfo> result1 = cartInfoDS                .keyBy(t -> t.getSensorId())                //使用的是处理时间                //每5秒钟统计一次,最近5秒钟内                .countWindow(10,5)                .sum("count");        //4.打印输出        //result.printToErr();        result1.print();        //5.执行流环境        env.execute();    }    @Data    @AllArgsConstructor    @NoArgsConstructor    public static class CartInfo {        private String sensorId;//信号灯id        private Integer count;//通过该信号灯的车的数量    }}
  • 统计会话指定时间内的数据,如果这个窗口内没有数据,就不在计算,设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算。

  • 案例 - 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

image-20210507101009350

  • 代码

    /** * Author itcast * Date 2021/5/7 9:13 * 有如下数据表示: * 信号灯编号和通过该信号灯的车的数量 * 9,3 * 9,2 * 9,7 * 4,9 * 2,6 * 1,5 * 2,3 * 5,7 * 5,4 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算 */public class WindowDemo03 {    public static void main(String[] args) throws Exception {        //1.创建流环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);        env.setParallelism(1);        //2.获取数据源        DataStreamSource<String> source = env.socketTextStream("node1", 9999);        //3.转换操作 基于key window 统计        DataStream<CartInfo> cartInfoDS = source.map(new MapFunction<String, CartInfo>() {            @Override            public CartInfo map(String value) throws Exception {                String[] split = value.split(",");                return new CartInfo(split[0], Integer.parseInt(split[1]));            }        });        //统计 会话窗口        DataStream<CartInfo> result = cartInfoDS                .keyBy(t -> t.getSensorId())                //使用的是处理时间                //每5条数据统计一次                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))                .sum("count");        //4.打印输出        //result.printToErr();        result.print();        //5.执行流环境        env.execute();    }    @Data    @AllArgsConstructor    @NoArgsConstructor    public static class CartInfo {        private String sensorId;//信号灯id        private Integer count;//通过该信号灯的车的数量    }}

Time 时间

  • EventTime的重要性
    • 防止出现网络抖动,造成数据的乱序,数据统计的丢失
  • 窗口: 开始时间-结束时间

watermark 水印时间

  • watermark 水印机制

    • watermark 就是时间戳
    • watermark = eventTime - maxDelayTime
  • 触发计算 watermak >= 结束时间

watermark 案例

  • 需求

    有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

    要求每隔5s,计算5秒内,每个用户的订单总金额

    并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

    基础版:

    package cn.itcast.sz22.day03;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */public class WatermarkDemo01 {    public static void main(String[] args) throws Exception {        //1.env        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //        env.setParallelism(1);        //2.Source        //模拟实时订单数据(数据有延迟和乱序)        DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {            private boolean flag = true;            @Override            public void run(SourceContext<Order> ctx) throws Exception {                Random random = new Random();                while (flag) {                    String orderId = UUID.randomUUID().toString();                    int userId = random.nextInt(3);                    int money = random.nextInt(100);                    //模拟数据延迟和乱序!                    long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;                    ctx.collect(new Order(orderId, userId, money, eventTime));                    TimeUnit.SECONDS.sleep(1);                }            }            @Override            public void cancel() {                flag = false;            }        });        // 添加水印机制 最大允许延迟的时间为 3 秒        //orderDS.printToErr();        //分配水印机制        SingleOutputStreamOperator<Order> sum = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy                //指定最大的延迟时间                .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))                //指定 eventTime 是哪个字段 long extractTimestamp(T element, long recordTimestamp);                .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))                //统计每个用户对应 购买 金额                .keyBy(t -> t.getUserId())                //指定窗口,每5秒钟统计5秒钟之内的数据                .window(TumblingEventTimeWindows.of(Time.seconds(5)))                .sum("money");        sum.print();        //        env.execute();    }    @Data    @AllArgsConstructor    @NoArgsConstructor    public static class Order {        private String orderId;        private Integer userId;        private Integer money;        private Long eventTime;    }}

    扩展版:

    package cn.itcast.sz22.day03;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.commons.lang3.time.FastDateFormat;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import java.util.ArrayList;import java.util.List;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/5/7 11:04 * Desc TODO */public class WatermarkDemo02 {    public static void main(String[] args) throws Exception {        //1.env        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //        env.setParallelism(1);        //2.Source        //模拟实时订单数据(数据有延迟和乱序)        DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {            private boolean flag = true;            @Override            public void run(SourceContext<Order> ctx) throws Exception {                Random random = new Random();                while (flag) {                    String orderId = UUID.randomUUID().toString();                    int userId = random.nextInt(3);                    int money = random.nextInt(100);                    //模拟数据延迟和乱序!                    long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;                    ctx.collect(new Order(orderId, userId, money, eventTime));                    TimeUnit.SECONDS.sleep(1);                }            }            @Override            public void cancel() {                flag = false;            }        });        DataStream<Order> WatermarkDS = orderDS                .assignTimestampsAndWatermarks(                        new WatermarkStrategy<Order>() {                            @Override                            public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {                                return new WatermarkGenerator<Order>() {                                    FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");                                    private int userId = 0;                                    private long eventTime = 0L;                                    private final long outOfOrdernessMillis = 3000;                                    private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;                                    @Override                                    public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {                                        userId = event.userId;                                        eventTime = event.eventTime;                                        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);                                    }                                    @Override                                    public void onPeriodicEmit(WatermarkOutput output) {                                        //Watermark = 当前最大事件时间 - 最大允许的延迟时间或乱序时间 时间戳                                        Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);                                        System.out.println("key:" + userId + ",系统时间:" + df.format(System.currentTimeMillis()) + ",事件时间:" + df.format(eventTime) + ",水印时间:" + df.format(watermark.getTimestamp()));                                        output.emitWatermark(watermark);                                    }                                };                            }                        }.withTimestampAssigner((event, timestamp) -> event.getEventTime())                );        //代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了        //要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额       /* DataStream<Order> result = WatermarkDS .keyBy(Order::getUserId) //.timeWindow(Time.seconds(5), Time.seconds(5)) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum("money");*/        //开发中使用上面的代码进行业务计算即可        //学习测试时可以使用下面的代码对数据进行更详细的输出,如输出窗口触发时各个窗口中的数据的事件时间,Watermark时间        DataStream<String> result = WatermarkDS                .keyBy(Order::getUserId)                .window(TumblingEventTimeWindows.of(Time.seconds(5)))                //把apply中的函数应用在窗口中的数据上                //WindowFunction<IN, OUT, KEY, W extends Window>                .apply(new WindowFunction<Order, String, Integer, TimeWindow>() {                    FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");                    @Override                    public void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {                        //准备一个集合用来存放属于该窗口的数据的事件时间                        List<String> eventTimeList = new ArrayList<>();                        for (Order order : input) {                            Long eventTime = order.eventTime;                            eventTimeList.add(df.format(eventTime));                        }                        String outStr = String.format("key:%s,窗口开始结束:[%s~%s),属于该窗口的事件时间:%s",                                key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);                        out.collect(outStr);                    }                });        // 添加水印机制 最大允许延迟的时间为 3 秒        //orderDS.printToErr();        result.printToErr();        env.execute();        //分配水印机制    }    @Data    @AllArgsConstructor    @NoArgsConstructor    public static class Order {            private String orderId;            private Integer userId;            private Integer money;            private Long eventTime;        }}

Allowed lateness

  • 案例

    有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

    要求每隔5s,计算5秒内,每个用户的订单总金额

    并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

package cn.itcast.sz22.day03;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.OutputTag;import java.time.Duration;import java.util.Random;import java.util.UUID;import java.util.concurrent.TimeUnit;/** * Author itcast * Date 2021/5/7 14:51 * 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) * 要求每隔5s,计算5秒内,每个用户的订单总金额 * 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。 */public class WatermarkDemo03 {    public static void main(String[] args) throws Exception {        //1.env        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2.Source        //模拟实时订单数据(数据有延迟和乱序)        DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {            private boolean flag = true;            @Override            public void run(SourceContext<Order> ctx) throws Exception {                Random random = new Random();                while (flag) {                    String orderId = UUID.randomUUID().toString();                    int userId = random.nextInt(3);                    int money = random.nextInt(100);                    //模拟数据延迟和乱序!                    long eventTime = System.currentTimeMillis() - random.nextInt(15) * 1000;                    ctx.collect(new Order(orderId, userId, money, eventTime));                    TimeUnit.SECONDS.sleep(1);                }            }            @Override            public void cancel() {                flag = false;            }        });        OutputTag<Order> oot = new OutputTag<Order>("maxDelayOrder", TypeInformation.of(Order.class));        //分配水印机制 eventTime 默认使用 maxDelay 3秒        SingleOutputStreamOperator<Order> result = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy                .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))                .withTimestampAssigner((element, recordTimestamp) -> element.getEventTime()))                .keyBy(t -> t.getUserId())                //窗口设置 每隔5s,计算5秒内                .window(TumblingEventTimeWindows.of(Time.seconds(5)))                //实例化侧输出流 主要用于晚于最大延迟 3 秒的数据                .allowedLateness(Time.seconds(3))                .sideOutputLateData(oot)                //统计                .sum("money");        result.print("正常数据");        result.getSideOutput(oot).print("严重迟到的数据");        env.execute();    }    @Data    @AllArgsConstructor    @NoArgsConstructor    public static class Order {        private String orderId;        private Integer userId;        private Integer money;        private Long eventTime;    }}
原网站

版权声明
本文为[华为云]所创,转载请带上原文链接,感谢
https://bbs.huaweicloud.com/blogs/365733