当前位置:网站首页>Flip window join, interval join, window cogroup

Flip window join, interval join, window cogroup

2022-06-11 12:11:00 But don't ask about your future


1. Window connection (Window Join)

  Flink Provides a window join (window join) operator , You can define the time window , And share a common key between the two streams (key) Put the data in the window for pairing processing

1.1 Call of window connection

   Implementation of window connection in code , First you need to call DataStream Of .join() Method to merge two streams , Get one JoinedStreams; Then passed .where() and .equalTo() Method to specify the join in two streams key; And then through .window() Open the window , And call .apply() Pass in the join window function for processing and calculation

stream1.join(stream2)
 .where(<KeySelector>)
 .equalTo(<KeySelector>)
 .window(<WindowAssigner>)
 .apply(<JoinFunction>)

  .where() The argument for is the key selector (KeySelector), Used to specify... In the first stream key; and .equalTo() Incoming KeySelector Then... In the second stream is specified key. The same elements , If in the same window , You can match

1.2 Processing flow of window connection

   First of all, according to key grouping 、 Enter the corresponding window to store ; When the window end time is reached , The operator will first count all combinations of the data of the two streams in the window , That is to do a Cartesian product of the data in the two streams ( Equivalent to cross join of tables ,cross join), And then we're going to iterate , Put each pair of matching data , As a parameter (first,second) Pass in JoinFunction Of .join() Method for calculation and processing , The results obtained are output directly . So every pair of data in the window is successfully connected and matched ,JoinFunction Of .join() Method will be called once , And output a result
 Insert picture description here

1.3 Window join instance

public class WindowJoinExample {
    
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Tuple2<String, Long>> stream1 = env.fromElements(Tuple2.of("a", 1000L), Tuple2.of("b", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 2000L))
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
    
                                                   @Override
                                                   public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
    
                                                       return stringLongTuple2.f1;
                                                   }
                                               }
                        )
                );
        DataStream<Tuple2<String, Long>> stream2 = env.fromElements(Tuple2.of("a", 3000L), Tuple2.of("b", 3000L), Tuple2.of("a", 4000L), Tuple2.of("b", 4000L))
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
    
                                                   @Override
                                                   public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
    
                                                       return stringLongTuple2.f1;
                                                   }
                                               }
                        )
                );
        stream1.join(stream2)
                .where(new KeySelector<Tuple2<String, Long>, String>() {
    
                    @Override
                    public String getKey(Tuple2<String, Long> value) throws Exception {
    
                        return value.f0;
                    }
                }).equalTo(new KeySelector<Tuple2<String, Long>, String>() {
    
                               @Override
                               public String getKey(Tuple2<String, Long> value) throws Exception {
    
                                   return value.f0;
                               }
                           }
        )
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new FlatJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
    
                    @Override
                    public void join(Tuple2<String, Long> first, Tuple2<String, Long> second, Collector<String> out) throws Exception {
    
                        out.collect(first + "=>" + second);
                    }
                })
                .print();
        env.execute();
    }
}

 Insert picture description here

2. Interval connection (Interval Join)

   On the e-commerce website , User behavior tends to be strongly correlated in a short time . Suppose there are two streams , One is the flow of placing orders , One is the stream of browsing data , A connection can be made for the same user , Make a join query between the user's order placing event and the user's browsing data in the last ten minutes (Interval Join)

2.1 The principle of interval connection

The interval join joins elements of two streams ( A & B ) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A
 Insert picture description here

   To the yellow stream (A) Any data element in a, Divide a time interval :[a.timestamp + lowerBound, a.timestamp + upperBound], Green flow ( B) Data elements in b, key Same and timestamp here
Within a range of ,a and b Successfully paired

where a and b are elements of A and B that share a common key. Both the lower and upper bound can be either negative or positive as long as the lower bound is always smaller or equal to the upper bound. The interval join currently only performs inner joins.
When a pair of elements are passed to the ProcessJoinFunction, they will be assigned with the larger timestamp (which can be accessed via the ProcessJoinFunction.Context) of the two elements.

2.2 Call of interval connection

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){
    

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
    
            out.collect(left + "," + right);
        }
    });

2.3 Interval connection instance

public class IntervalJoinTest {
    
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
                Tuple3.of("Mary", "order-1", 5000L),
                Tuple3.of("Alice", "order-2", 5000L),
                Tuple3.of("Bob", "order-3", 20000L),
                Tuple3.of("Alice", "order-4", 20000L),
                Tuple3.of("Cary", "order-5", 51000L)
        )
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple3<String, String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
    
                            @Override
                            public long extractTimestamp(Tuple3<String, String, Long>
                                                                 element, long recordTimestamp) {
    
                                return element.f2;
                            }
                        })
                );
        SingleOutputStreamOperator<Event> clickStream = env.fromElements(
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 36000L),
                new Event("Bob", "./home", 30000L),
                new Event("Bob", "./prod?id=1", 23000L),
                new Event("Bob", "./prod?id=3", 33000L)
        )
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
    
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
    
                                return element.timestamp;
                            }
                        })
                );
        orderStream
                .keyBy(new KeySelector<Tuple3<String, String, Long>, String>() {
    
                    @Override
                    public String getKey(Tuple3<String, String, Long> data) throws Exception {
    
                        return data.f0;
                    }
                })
                .intervalJoin(clickStream.keyBy(new KeySelector<Event, String>() {
    
                    @Override
                    public String getKey(Event data) throws Exception {
    
                        return data.user;
                    }
                }))
                .between(Time.seconds(-5), Time.seconds(10)) // Before 5 Second   after 10 Second   Range 
                .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
    
                    @Override
                    public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
    
                        out.collect(right + " => " + left);
                    }
                })
                .print();

        env.execute();
    }
}

result :
 Insert picture description here


3. The window is connected with the group (Window CoGroup)

   And window join Very similar , After merging the two streams, open a window to handle the matching elements , When calling, you only need to .join() Replace with .coGroup()

stream1.coGroup(stream2)
 .where(<KeySelector>)
 .equalTo(<KeySelector>)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .apply(<CoGroupFunction>)

   And window join The difference is that , call .apply() Method to define a specific operation , What's coming in is a CoGroupFunction
 Insert picture description here
   The three parameters passed in , Each represents the data in the two streams and the collector for output (Collector). The difference is , The first two parameters are no longer separate pairs of data , Instead, a traversable data set is passed in . That is, the Cartesian product of two stream data sets in the window will not be calculated , Instead, all the data collected is directly transmitted to , As for how to pair, it is completely customized . such .coGroup() Methods are called only once , And even if the data of one stream does not match the data of another stream , It can also appear in the collection

public class CoGroupExample {
    
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String, Long>> stream1 = env.fromElements(
                Tuple2.of("a", 1000L),
                Tuple2.of("b", 1000L),
                Tuple2.of("a", 2000L),
                Tuple2.of("b", 2000L)
        )
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
    
                                                   @Override
                                                   public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
    
                                                       return stringLongTuple2.f1;
                                                   }
                                               }
                        )
                );
        DataStream<Tuple2<String, Long>> stream2 = env.fromElements(
                Tuple2.of("a", 3000L),
                Tuple2.of("b", 3000L),
                Tuple2.of("a", 4000L),
                Tuple2.of("b", 4000L)
        )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
    
                                                           @Override
                                                           public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
    
                                                               return stringLongTuple2.f1;
                                                           }
                                                       }
                                )
                );
        stream1
                .coGroup(stream2)
                .where(r -> r.f0)
                .equalTo(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
    
                    @Override
                    public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {
    
                        collector.collect(iter1 + "=>" + iter2);
                    }
                })
                .print();
        env.execute();
    }
}
原网站

版权声明
本文为[But don't ask about your future]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/162/202206111206047775.html