当前位置:网站首页>flink Window Join、Interval Join、Window CoGroup (两流匹配 指定key联结,开窗口进行窗口操作)
flink Window Join、Interval Join、Window CoGroup (两流匹配 指定key联结,开窗口进行窗口操作)
2022-06-11 12:06:00 【但行益事莫问前程】
文章目录
1. 窗口联结(Window Join)
Flink 提供了一个窗口联结(window join)算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理
1.1 窗口联结的调用
窗口联结在代码中的实现,首先需要调用 DataStream 的.join()方法来合并两条流,得到一个 JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的 key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
.where()的参数是键选择器(KeySelector),用来指定第一条流中的 key; 而.equalTo()传入的 KeySelector 则指定了第二条流中的 key。两者相同的元素,如果在同一窗口中,就可以匹配起来
1.2 窗口联结的处理流程
首先会按照 key 分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接,cross join),然后进行遍历,把每一对匹配的数据,作为参数(first,second)传入 JoinFunction 的.join()方法进行计算处理,得到的结果直接输出。所以窗口中每有一对数据成功联结匹配,JoinFunction的.join()方法就会被调用一次,并输出一个结果
1.3 窗口联结实例
public class WindowJoinExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> stream1 = env.fromElements(Tuple2.of("a", 1000L), Tuple2.of("b", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 2000L))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env.fromElements(Tuple2.of("a", 3000L), Tuple2.of("b", 3000L), Tuple2.of("a", 4000L), Tuple2.of("b", 4000L))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1.join(stream2)
.where(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
}).equalTo(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
}
)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new FlatJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void join(Tuple2<String, Long> first, Tuple2<String, Long> second, Collector<String> out) throws Exception {
out.collect(first + "=>" + second);
}
})
.print();
env.execute();
}
}

2.间隔联结(Interval Join)
在电商网站中,用户行为往往会有短时间内的强关联。假设有两条流,一条是下订单的流,一条是浏览数据的流,可针对同一个用户做一个联结,使用户的下订单事件和用户的最近十分钟的浏览数据进行一个联结查询(Interval Join)
2.1 间隔联结的原理
The interval join joins elements of two streams ( A & B ) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A
对黄色的流(A)中的任意一个数据元素 a,划分一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],绿色的流( B)中的数据元素 b, key相同且时间戳在这
个区间范围内,a 和 b 成功配对
where a and b are elements of A and B that share a common key. Both the lower and upper bound can be either negative or positive as long as the lower bound is always smaller or equal to the upper bound. The interval join currently only performs inner joins.
When a pair of elements are passed to the ProcessJoinFunction, they will be assigned with the larger timestamp (which can be accessed via the ProcessJoinFunction.Context) of the two elements.
2.2 间隔联结的调用
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(left + "," + right);
}
});
2.3 间隔联结实例
public class IntervalJoinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
Tuple3.of("Mary", "order-1", 5000L),
Tuple3.of("Alice", "order-2", 5000L),
Tuple3.of("Bob", "order-3", 20000L),
Tuple3.of("Alice", "order-4", 20000L),
Tuple3.of("Cary", "order-5", 51000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long>
element, long recordTimestamp) {
return element.f2;
}
})
);
SingleOutputStreamOperator<Event> clickStream = env.fromElements(
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Alice", "./prod?id=200", 3500L),
new Event("Bob", "./prod?id=2", 2500L),
new Event("Alice", "./prod?id=300", 36000L),
new Event("Bob", "./home", 30000L),
new Event("Bob", "./prod?id=1", 23000L),
new Event("Bob", "./prod?id=3", 33000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
orderStream
.keyBy(new KeySelector<Tuple3<String, String, Long>, String>() {
@Override
public String getKey(Tuple3<String, String, Long> data) throws Exception {
return data.f0;
}
})
.intervalJoin(clickStream.keyBy(new KeySelector<Event, String>() {
@Override
public String getKey(Event data) throws Exception {
return data.user;
}
}))
.between(Time.seconds(-5), Time.seconds(10)) //之前5秒钟 之后10秒钟 范围
.process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
@Override
public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
out.collect(right + " => " + left);
}
})
.print();
env.execute();
}
}
结果:
3.窗口同组联结(Window CoGroup)
与 window join 非常类似,将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()
stream1.coGroup(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(<CoGroupFunction>)
与 window join 的区别在于,调用.apply()方法定义具体操作时,传入的是一个CoGroupFunction
传入的三个参数,分别代表两条流中的数据以及用于输出的收集器(Collector)。不同的是,前两个参数不再是单独的每一组配对数据了,而是传入了可遍历的数据集合。即不会再去计算窗口中两条流数据集的笛卡尔积,而是直接把收集到的所有数据一次性传入,至于要怎样配对完全是自定义的。这样.coGroup()方法只会被调用一次,而且即使一条流的数据没有任何另一条流的数据匹配,也可以出现在集合中
public class CoGroupExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Long>> stream1 = env.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
DataStream<Tuple2<String, Long>> stream2 = env.fromElements(
Tuple2.of("a", 3000L),
Tuple2.of("b", 3000L),
Tuple2.of("a", 4000L),
Tuple2.of("b", 4000L)
)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
return stringLongTuple2.f1;
}
}
)
);
stream1
.coGroup(stream2)
.where(r -> r.f0)
.equalTo(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {
collector.collect(iter1 + "=>" + iter2);
}
})
.print();
env.execute();
}
}
边栏推荐
- Qt中radioButton使用
- Adjust the array order so that odd numbers precede even numbers (C language)
- 刷题笔记(十四)--二叉树:层序遍历和DFS,BFS
- 解决swagger文档接口404的问题
- 2019 book list
- P2580 "so he started the wrong roll call"
- Live app source code, and the status bar and navigation bar are set to transparent status
- Intermediate web development engineer, interview questions + Notes + project practice
- Uncaught typeerror: cannot set property 'next' of undefined
- Uncaught TypeError: Cannot set property ‘next‘ of undefined 报错解决
猜你喜欢

让你搞懂冒泡排序(C语言)

Iframe value transfer

Guangdong municipal safety construction data management software 2022 new forms are coming

Maximum water container

中文输入法输入事件composition的使用

Notes on topic brushing (XIV) -- binary tree: sequence traversal and DFS, BFS

Gestion de projets logiciels 7.1. Concept de base du calendrier du projet

Elk - x-pack set user password

Apple mobileone: the mobile terminal only needs 1ms of high-performance backbone

一般运维架构图
随机推荐
Secret derrière le seau splunk
Where is it safer to open an account for soda ash futures? How much capital is needed for a transaction?
Intl.NumberFormat 设置数字格式
一般运维架构图
SQLServer连接数据库(中文表)部分数据乱码问题解决
Apple mobileone: the mobile terminal only needs 1ms of high-performance backbone
Use of Chinese input method input event composition
Splunk 手工同步search head
Qt中radioButton使用
Pan domain SSL certificate, sectigo cheap wildcard certificate popularization plan
[JUC supplementary] atomic class, unsafe
Fast build elk7.3
Hamiltonian graph
中国联通 22春招 群面
线程五种状态(线程生命周期)
. The way to prove the effect of throwing exceptions on performance in. Net core
Command symbols commonly used by programmers
Read geo expression matrix
解决Splunk kvstore “starting“ 问题
When the security engineer finds a major vulnerability in the PS host, the CD can execute arbitrary code in the system
