2022-07-03 09:00:00 【小胡今天有变强吗】
8. 多流转换
无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的。而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将 一条流拆分开,所以经常会有对多条流进行处理的场景。
8.1 分流
在 Flink 1.13 版本中,已经弃用了.split()方法,取而代之的是直接用处理函数(process function)的侧输出流(side output)。侧输出流的标记和提取,都 离不开一个“输出标签”(OutputTag),它就相当于 split()分流时的“戳”,指定了侧输出流的 id 和类型。
8.2 合流
8.2.1 联合(Union)
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union),联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素, 数据类型不变。
在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参 数,就可以实现流的联合了;得到的依然是一个 DataStream:
stream1.union(stream2, stream3, ...)
在事件时间语义下,水位线是时间的进度标志;不同的流中可能 水位线的进展快慢完全不同,如果它们合并在一起,水位线又该以哪个为准呢?考虑水位线的本质含义,是“之前的所有数据已经到齐了”;所以对于合流之后的 水位线,也是要以最小的那个为准,这样才可以保证所有流都不会再传来之前的数据。也就是多流合并时处理的时效性是以最慢的那个流为准的。
8.2.2 连接(Connect)
1. 连接流(ConnectedStreams)
为了处理更加灵活,连接操作允许流的数据类型不同。但一个 DataStream 中的 数据只能有唯一的类型,所以连接得到的并不是 DataStream,而是一个“连接流” (ConnectedStreams)。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中; 事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的 DataStream, 还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型 的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是 “一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到 同一个 DataStream 中。
mport org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class ConnectTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream1 = env.fromElements(1,2,3);
DataStream<Long> stream2 = env.fromElements(1L,2L,3L);
ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {
public String map1(Integer value) {
return "Integer: " + value;
public String map2(Long value) {
return "Long: " + value;
两条流的连接(connect),与联合(union)操作相比,最大的优势就是可以处理不同类型 的流的合并,使用更灵活、应用更广泛。当然它也有限制,就是合并流的数量只能是 2,而 union 可以同时进行多条流的合并。这也非常容易理解:union 限制了类型不变,所以直接合并没有 问题;而 connect 是“一国两制”,后续处理的接口只定义了两个转换方法,如果扩展需要重 新定义接口,所以不能“一国多制”。
2. CoProcessFunction
对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口 中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调 用。我们把这种接口叫作“协同处理函数”(co-process function)。与 CoMapFunction 类似,如 果是调用.flatMap()就需要传入一个 CoFlatMapFunction,需要实现 flatMap1()、flatMap2()两个 方法;而调用.process()时,传入的则是一个 CoProcessFunction。
抽象类 CoProcessFunction 在源码中定义如下:
public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
public abstract class Context {
CoProcessFunction 也是“处理函数”家族中的一员,用法非常相 似。它需要实现的就是 processElement1()、processElement2()两个方法,在每个数据到来时, 会根据来源的流调用其中的一个方法进行处理。CoProcessFunction 同样可以通过上下文 ctx 来 访问 timestamp、水位线,并通过 TimerService 注册定时器;另外也提供了.onTimer()方法,用 于定义定时触发的处理操作。
3. 广播连接流(BroadcastConnectedStream)
这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所 以我们可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所 以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并 行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播 状态”(broadcast state)。
广播状态底层是用一个“映射”(map)结构来保存的。在代码实现上,可以直接调用 DataStream 的.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor)说明状态 的名称和类型,就可以得到规则数据的“广播流”(BroadcastStream):
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
接下来就可以将要处理的数据流,与这条广播流进行连接(connect),得到的就是所 谓的“广播连接流”(BroadcastConnectedStream)。基于 BroadcastConnectedStream 调用.process() 方法,就可以同时获取规则和数据,进行动态处理了。
DataStream<String> output = stream
.process( new BroadcastProcessFunction<>() {
...} );
BroadcastProcessFunction 与 CoProcessFunction 类似,同样是一个抽象类,需要实现两个 方法叫作.processElement() 和.processBroadcastElement()。源码中定义如下:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends
BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx,
Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx,
Collector<OUT> out) throws Exception;
8.3 基于时间的合流
对于两条流的合并,很多情况我们并不是简单地将所有数据放在一起,而是希望根据某个 字段的值将它们联结起来,“配对”去做处理。这种需求与关系型数据库中表的 join 操作非常相近。
8.3.1 窗口联结(Window Join)
窗口联结(window join)算子,可以定义时间窗口,并 将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
- 窗口联结的调用
窗口联结在代码中的实现,首先需要调用 DataStream 的.join()方法来合并两条流,得到一 个 JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的 key;然后通 过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算。
传入的 JoinFunction 也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法 有两个参数,分别表示两条流中成对匹配的数据。JoinFunction 在源码中的定义如下:
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT join(IN1 first, IN2 second) throws Exception;
- 窗口联结的处理流程
8.3.2 间隔联结(Interval Join)
- 间隔联结的原理
间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔, 看这期间是否有来自另一条流的数据匹配。
匹配的条件为: a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound ,需要注意,做间隔联结的两条流 A 和 B,也必须基于相同的 key;下界 lowerBound 应该小于等于上界 upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
- 间隔联结的调用
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
public void processElement(Integer left, Integer right, Context ctx,
Collector<String> out) {
out.collect(left + "," + right);
8.3.3 窗口同组联结(Window CoGroup)
Window CoGroup的用法跟 window join 非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时 只需要将.join()换为.coGroup()。
多流转换是流处理在实际应用中常见的需求,主要包括分流和合流两大类。最基本的合流方式是联合(union)和连接(connect),两者的主要区别在于 union 可以对 多条流进行合并,数据类型必须一致;而 connect 只能连接两条流,数据类型可以不同。Flink 还提供了内置的几个联结(join)操作,它们都是基于某个时间段的双流 合并,是需求特化之后的高层级 API。主要包括窗口联结(window join)、间隔联结(interval join) 和窗口同组联结(window coGroup)。其中 window join 和 coGroup 都是基于时间窗口的操作。
