当前位置:网站首页>Flink实战--多流合并
Flink实战--多流合并
2022-07-01 05:53:00 【一 铭】
Flink实战–多流合并
概述
本文介绍Flink的流合并操作。在Flink中,流的合并操作算子有:Union和Connect等。本文主要介绍这个两个算子的使用方法。
1.Union算子的使用
返回值:DataStream->DataStream
功能:合并两个或多个数据流,创建包含所有流中的所有元素的新流。注意:如果你将一个数据流和它本身联合起来,你将在结果流中得到每个元素两次。也就是说Union操作是不会去重的。
另外,被union的两个流的数据类型必须要一致。
说明:通过union算子可以把两个数据流进行合并,这样当有多个具有相同类型的数据流时,就可以当(合并)成一个流来进行处理。从而让流的处理更加灵活。
public class UionDemo1 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment senv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> src1 = senv.fromElements(
new Tuple2<>("shanghai", 15),
new Tuple2<>("beijing", 25));
DataStream<Tuple2<String, Integer>> src2 = senv.fromElements(
new Tuple2<>("sichuan", 35),
new Tuple2<>("chongqing", 45));
DataStream<Tuple2<String, Integer>> src3 = senv.fromElements(
new Tuple2<>("shenzheng", 55),
new Tuple2<>("guanzhou", 65));
// 这个流不能和以上几个流进行union,由于流数据的类型不同
// DataStream<Integer> src4 = senv.fromElements(2, 3);
DataStream<Tuple2<String, Integer>> union = src1.union(src2, src3);
union.filter(t->t.f1>30).print("union");
senv.execute();
}
}
以上代码把3个流合并成一个流,然后对合并的流进行处理(过滤)。最后打印输出。输出的内容如下:
union:5> (sichuan,35)
union:6> (chongqing,45)
union:6> (shenzheng,55)
union:7> (guanzhou,65)
2.connect算子的使用
返回值:DataStream,DataStream → ConnectedStream
功能:“连接” 两个数据流并保留各自的类型。connect 允许在两个流的处理逻辑之间共享状态。
Connect算子和Union算子的区别:
(1)Connect算子可以合并两个不同类型的数据流,而Uion只能合并相同类型的数据流。
(2)Connect算子只支持两个数据流的合并,union可以支持多个数据流的合并。
(3)两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
输入输出:ConnectedStream → DataStream
功能:类似于在连接的数据流上进行 map 和 flatMap。
public class ConnectOpDemo1 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment senv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> src1 = senv.fromElements(
new Tuple2<>("shanghai", 15),
new Tuple2<>("beijing", 25));
DataStream<Integer> src4 = senv.fromElements(2, 3);
ConnectedStreams<Tuple2<String, Integer>, Integer> connStream = src1.connect(src4);
// 对不同类型的流,进行不同的处理,并统一输出成一个新的数据类型。
// 这里,我把两个流的数据都转成了String类型,这样方便后续的处理。
DataStream<String> res = connStream.flatMap(new CoFlatMapFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public void flatMap1(Tuple2<String, Integer> value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(Integer value, Collector<String> out) {
String word = String.valueOf(value);
out.collect(word);
}
});
res.print();
senv.execute();
}
小结
在Flink中可以通过Union和Connect进行流的合并。通过以上例子可以看出两个操作使用起来还是非常方便的。不过在使用时,需要注意它们两者的区别。
边栏推荐
- FPGA - 7系列 FPGA内部结构之Clocking -02- 时钟布线资源
- PLA不粘贴在床上:6个简单的解决方案
- [QT] QT after addition, subtraction, multiplication and division, two decimal places are reserved
- 从MLPerf谈起:如何引领AI加速器的下一波浪潮
- 【问题思考总结】为什么寄存器清零是在用户态进行的?
- SystemVerilog学习-08-随机约束和线程控制
- 穿越派 你的数据云行
- 数据库问题,如何优化Oracle SQL查询语句更快,效率更高
- 基于LabVIEW的计时器
- He struggled day and night to protect his data
猜你喜欢

穿越派·派盘 + 思源笔记 = 私人笔记本

excel动态图表

QT write custom control - self drawn battery

Smartinstantiationawarebeanpostprocessor of the extension point series determines which construction method to execute - Chapter 432

Seven major technical updates that developers should pay most attention to on build 2022

OpenGL ES: (5) OpenGL的基本概念、OpenGL ES 在屏幕产生图片的过程、OpenGL管线(pipeline)

数据库问题,如何优化Oracle SQL查询语句更快,效率更高

穿越派与贸大合作,为大学生增添效率

C语言初阶——实现扫雷游戏

Essay learning record essay multi label Global
随机推荐
excel動態圖錶
What if the data in the cloud disk is harmonious?
数据库问题,如何优化Oracle SQL查询语句更快,效率更高
Talking from mlperf: how to lead the next wave of AI accelerator
Preliminary level of C language -- selected good questions on niuke.com
Educational administration management system of SSM (free source code)
bat操作ftp上传下载命令
穿越派 你的数据云行
【考研高数 武忠祥+880版 自用】高数第二章基础阶段思维导图
3D printer threading: five simple solutions
excel初级应用案例——杜邦分析仪
栈题目:解析布尔表达式
How to add a gourd pie plate
[QT] QT after addition, subtraction, multiplication and division, two decimal places are reserved
分片上传与断点续传
SystemVerilog学习-06-类的封装
OpenGL ES: (3) EGL、EGL绘图的基本步骤、EGLSurface、ANativeWindow
CJC8988带2个立体声耳机驱动器的低功率立体声编解码器
Data governance: data governance framework (Part I)
论文学习记录随笔 多标签之LIFT