当前位置:网站首页>Flip window join, interval join, window cogroup
Flip window join, interval join, window cogroup
2022-06-11 12:11:00 【But don't ask about your future】
List of articles
1. Window connection (Window Join)
Flink Provides a window join (window join) operator , You can define the time window , And share a common key between the two streams (key) Put the data in the window for pairing processing
1.1 Call of window connection
Implementation of window connection in code , First you need to call DataStream Of .join() Method to merge two streams , Get one JoinedStreams; Then passed .where() and .equalTo() Method to specify the join in two streams key; And then through .window() Open the window , And call .apply() Pass in the join window function for processing and calculation
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
.where() The argument for is the key selector (KeySelector), Used to specify... In the first stream key; and .equalTo() Incoming KeySelector Then... In the second stream is specified key. The same elements , If in the same window , You can match
1.2 Processing flow of window connection
First of all, according to key grouping 、 Enter the corresponding window to store ; When the window end time is reached , The operator will first count all combinations of the data of the two streams in the window , That is to do a Cartesian product of the data in the two streams ( Equivalent to cross join of tables ,cross join), And then we're going to iterate , Put each pair of matching data , As a parameter (first,second) Pass in JoinFunction Of .join() Method for calculation and processing , The results obtained are output directly . So every pair of data in the window is successfully connected and matched ,JoinFunction Of .join() Method will be called once , And output a result 
1.3 Window join instance
public class WindowJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> stream1 = env.fromElements(Tuple2.of("a", 1000L), Tuple2.of("b", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 2000L))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env.fromElements(Tuple2.of("a", 3000L), Tuple2.of("b", 3000L), Tuple2.of("a", 4000L), Tuple2.of("b", 4000L))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1.join(stream2)
.where(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
}).equalTo(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
}
)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new FlatJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void join(Tuple2<String, Long> first, Tuple2<String, Long> second, Collector<String> out) throws Exception {
out.collect(first + "=>" + second);
}
})
.print();
env.execute();
}
}

2. Interval connection (Interval Join)
On the e-commerce website , User behavior tends to be strongly correlated in a short time . Suppose there are two streams , One is the flow of placing orders , One is the stream of browsing data , A connection can be made for the same user , Make a join query between the user's order placing event and the user's browsing data in the last ten minutes (Interval Join)
2.1 The principle of interval connection
The interval join joins elements of two streams ( A & B ) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A
To the yellow stream (A) Any data element in a, Divide a time interval :[a.timestamp + lowerBound, a.timestamp + upperBound], Green flow ( B) Data elements in b, key Same and timestamp here
Within a range of ,a and b Successfully paired
where a and b are elements of A and B that share a common key. Both the lower and upper bound can be either negative or positive as long as the lower bound is always smaller or equal to the upper bound. The interval join currently only performs inner joins.
When a pair of elements are passed to the ProcessJoinFunction, they will be assigned with the larger timestamp (which can be accessed via the ProcessJoinFunction.Context) of the two elements.
2.2 Call of interval connection
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(left + "," + right);
}
});
2.3 Interval connection instance
public class IntervalJoinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
Tuple3.of("Mary", "order-1", 5000L),
Tuple3.of("Alice", "order-2", 5000L),
Tuple3.of("Bob", "order-3", 20000L),
Tuple3.of("Alice", "order-4", 20000L),
Tuple3.of("Cary", "order-5", 51000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long>
element, long recordTimestamp) {
return element.f2;
}
})
);
SingleOutputStreamOperator<Event> clickStream = env.fromElements(
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Alice", "./prod?id=200", 3500L),
new Event("Bob", "./prod?id=2", 2500L),
new Event("Alice", "./prod?id=300", 36000L),
new Event("Bob", "./home", 30000L),
new Event("Bob", "./prod?id=1", 23000L),
new Event("Bob", "./prod?id=3", 33000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
orderStream
.keyBy(new KeySelector<Tuple3<String, String, Long>, String>() {
@Override
public String getKey(Tuple3<String, String, Long> data) throws Exception {
return data.f0;
}
})
.intervalJoin(clickStream.keyBy(new KeySelector<Event, String>() {
@Override
public String getKey(Event data) throws Exception {
return data.user;
}
}))
.between(Time.seconds(-5), Time.seconds(10)) // Before 5 Second after 10 Second Range
.process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
@Override
public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
out.collect(right + " => " + left);
}
})
.print();
env.execute();
}
}
result :
3. The window is connected with the group (Window CoGroup)
And window join Very similar , After merging the two streams, open a window to handle the matching elements , When calling, you only need to .join() Replace with .coGroup()
stream1.coGroup(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(<CoGroupFunction>)
And window join The difference is that , call .apply() Method to define a specific operation , What's coming in is a CoGroupFunction
The three parameters passed in , Each represents the data in the two streams and the collector for output (Collector). The difference is , The first two parameters are no longer separate pairs of data , Instead, a traversable data set is passed in . That is, the Cartesian product of two stream data sets in the window will not be calculated , Instead, all the data collected is directly transmitted to , As for how to pair, it is completely customized . such .coGroup() Methods are called only once , And even if the data of one stream does not match the data of another stream , It can also appear in the collection
public class CoGroupExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> stream1 = env.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env.fromElements(
Tuple2.of("a", 3000L),
Tuple2.of("b", 3000L),
Tuple2.of("a", 4000L),
Tuple2.of("b", 4000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1
.coGroup(stream2)
.where(r -> r.f0)
.equalTo(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {
collector.collect(iter1 + "=>" + iter2);
}
})
.print();
env.execute();
}
}
边栏推荐
- 刷题笔记(十四)--二叉树:层序遍历和DFS,BFS
- 让你理解选择排序(C语言)
- Notes on brushing questions (13) -- binary tree: traversal of the first, middle and last order (review)
- Wechat web developers, how to learn web development
- Solving the problem of data garbled in sqlserver connection database (Chinese table)
- flink Window Join、Interval Join、Window CoGroup (两流匹配 指定key联结,开窗口进行窗口操作)
- Golang uses XOR ^ to exchange two variables and encrypt / decrypt them
- flink 时间语义、水位线(Watermark)、生成水位线、水位线的传递
- POJ 3278 catch the cow (width first search, queue implementation)
- Apple mobileone: the mobile terminal only needs 1ms of high-performance backbone
猜你喜欢

SQLServer连接数据库(中文表)部分数据乱码问题解决

Iframe value transfer

中文输入法输入事件composition的使用

Let you understand bubble sorting (C language)
![[JUC supplementary] atomic class, unsafe](/img/24/e51cfed39fe820fb46cca548af1782.jpg)
[JUC supplementary] atomic class, unsafe

读取geo表达矩阵

FTP server: downloading and using Serv-U

You call this shit MQ?

Use compiler option ‘--downlevelIteration‘ to allow iterating of iterators 报错解决

Generate statement is not synthesized
随机推荐
Wechat authorization to obtain mobile phone number
yapi安装
. The way to prove the effect of throwing exceptions on performance in. Net core
Android 11+ configuring sqlserver2014+
[Chapter II Relationship between genes and chromosomes] summary of biological knowledge - Biology in grade one of senior high school
纯数据业务的机器打电话进来时回落到了2G/3G
Fast build elk7.3
Software project management 7.1 Basic concept of project schedule
Let you understand selection sorting (C language)
log4j-slf4j-impl cannot be present with log4j-to-slf4j
(解决)Splunk 之 kv-store down 问题
Apple mobileone: the mobile terminal only needs 1ms of high-performance backbone
flink 窗口表值函数
Uncaught typeerror: cannot set property 'next' of undefined
Uncaught TypeError: Cannot set property ‘next‘ of undefined 报错解决
[JUC supplementary] atomic class, unsafe
flink Spark 和 Flink对比
Linux changes the MySQL password after forgetting it
軟件項目管理 7.1.項目進度基本概念
C # set or verify the format of text field in PDF
