当前位置:网站首页>Flink实战--多流合并
Flink实战--多流合并
2022-07-01 05:53:00 【一 铭】
Flink实战–多流合并
概述
本文介绍Flink的流合并操作。在Flink中,流的合并操作算子有:Union和Connect等。本文主要介绍这个两个算子的使用方法。
1.Union算子的使用
返回值:DataStream->DataStream
功能:合并两个或多个数据流,创建包含所有流中的所有元素的新流。注意:如果你将一个数据流和它本身联合起来,你将在结果流中得到每个元素两次。也就是说Union操作是不会去重的。
另外,被union的两个流的数据类型必须要一致。
说明:通过union算子可以把两个数据流进行合并,这样当有多个具有相同类型的数据流时,就可以当(合并)成一个流来进行处理。从而让流的处理更加灵活。
public class UionDemo1 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment senv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> src1 = senv.fromElements(
new Tuple2<>("shanghai", 15),
new Tuple2<>("beijing", 25));
DataStream<Tuple2<String, Integer>> src2 = senv.fromElements(
new Tuple2<>("sichuan", 35),
new Tuple2<>("chongqing", 45));
DataStream<Tuple2<String, Integer>> src3 = senv.fromElements(
new Tuple2<>("shenzheng", 55),
new Tuple2<>("guanzhou", 65));
// 这个流不能和以上几个流进行union,由于流数据的类型不同
// DataStream<Integer> src4 = senv.fromElements(2, 3);
DataStream<Tuple2<String, Integer>> union = src1.union(src2, src3);
union.filter(t->t.f1>30).print("union");
senv.execute();
}
}
以上代码把3个流合并成一个流,然后对合并的流进行处理(过滤)。最后打印输出。输出的内容如下:
union:5> (sichuan,35)
union:6> (chongqing,45)
union:6> (shenzheng,55)
union:7> (guanzhou,65)
2.connect算子的使用
返回值:DataStream,DataStream → ConnectedStream
功能:“连接” 两个数据流并保留各自的类型。connect 允许在两个流的处理逻辑之间共享状态。
Connect算子和Union算子的区别:
(1)Connect算子可以合并两个不同类型的数据流,而Uion只能合并相同类型的数据流。
(2)Connect算子只支持两个数据流的合并,union可以支持多个数据流的合并。
(3)两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
输入输出:ConnectedStream → DataStream
功能:类似于在连接的数据流上进行 map 和 flatMap。
public class ConnectOpDemo1 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment senv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> src1 = senv.fromElements(
new Tuple2<>("shanghai", 15),
new Tuple2<>("beijing", 25));
DataStream<Integer> src4 = senv.fromElements(2, 3);
ConnectedStreams<Tuple2<String, Integer>, Integer> connStream = src1.connect(src4);
// 对不同类型的流,进行不同的处理,并统一输出成一个新的数据类型。
// 这里,我把两个流的数据都转成了String类型,这样方便后续的处理。
DataStream<String> res = connStream.flatMap(new CoFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public void flatMap1(Tuple2<String, Integer> value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(Integer value, Collector<String> out) {
String word = String.valueOf(value);
out.collect(word);
}
});
res.print();
senv.execute();
}
小结
在Flink中可以通过Union和Connect进行流的合并。通过以上例子可以看出两个操作使用起来还是非常方便的。不过在使用时,需要注意它们两者的区别。
边栏推荐
- 数据库问题,如何优化Oracle SQL查询语句更快,效率更高
- SQL必会题之留存率
- OpenGL es: (3) EGL, basic steps of EGL drawing, eglsurface, anativewindow
- In win10 and win11, the scroll direction of Elan touch panel is reversed, and "double finger click to open the right-click menu" and "double finger scroll" are started“
- First defined here occurs during QT compilation. Causes and Solutions
- uniapp树形层级选择器
- Pla ne colle pas sur le lit: 6 solutions simples
- Ssm+mysql second-hand trading website (thesis + source code access link)
- linux 关闭redis 进程 systemd+
- 加密狗资料搜集
猜你喜欢

ONEFLOW source code parsing: automatic inference of operator signature

Continuous breakthrough and steady progress -- Review and Prospect of cross platform development technology of mobile terminal

Geoffrey Hinton: my 50 years of in-depth study and Research on mental skills

Qt编写自定义控件-自绘电池

How to transmit and share 4GB large files remotely in real time?

数据库问题,如何优化Oracle SQL查询语句更快,效率更高

Talking from mlperf: how to lead the next wave of AI accelerator

Send you through the data cloud

这才是大学生必备软件 | 知识管理

C语言初阶——牛客网精选好题
随机推荐
College community management system based on boot+jsp (with source code download link)
【问题思考总结】为什么寄存器清零是在用户态进行的?
基于LabVIEW的计时器
OpenGL ES: (4) EGL API详解 (转)
win10、win11中Elan触摸板滚动方向反转、启动“双指点击打开右键菜单“、“双指滚动“
OpenGL es: (3) EGL, basic steps of EGL drawing, eglsurface, anativewindow
Bat operation FTP upload and download command
MySQL数据迁移遇到的一些错误
Oracle sequence + trigger
HCM 初学 ( 三 ) - 快速输入PA70、PA71 浏览员工信息PA10
Brief description of activation function
[QT] QT after addition, subtraction, multiplication and division, two decimal places are reserved
2022 年面向初学者的 10 大免费 3D 建模软件
[note] e-commerce order data analysis practice
Oracle create user + Role
Build 2022 上开发者最应关注的七大方向主要技术更新
2022.6.30-----leetcode. one thousand one hundred and seventy-five
扩展点系列之SmartInstantiationAwareBeanPostProcessor确定执行哪一个构造方法 - 第432篇
Ucosiii --- engineering transplantation
Data governance: data governance framework (Part I)