当前位置:网站首页>flink 多流转换(侧输出流分流、Union、Connect) 实时对账app 的支付操作和第三方的支付操作的双流 Join
flink 多流转换(侧输出流分流、Union、Connect) 实时对账app 的支付操作和第三方的支付操作的双流 Join
2022-06-11 12:06:00 【但行益事莫问前程】
文章目录
前言
1. 分流
将一条数据流拆分成完全独立的两条、甚至多条流,直接用处理函数(process function)的侧输出流(side output)即可
public class SplitStreamByOutputTag {
private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv") {
};
private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv") {
};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());
SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
if (value.user.equals("Mary")) {
ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));
} else if (value.user.equals("Bob")) {
ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));
} else {
out.collect(value);
}
}
});
processedStream.getSideOutput(MaryTag).print("Mary pv");
processedStream.getSideOutput(BobTag).print("Bob pv");
processedStream.print("else");
env.execute();
}
}
2. 合流
2.1 联合(Union)
联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变
stream1.union(stream2, stream3, ...)
注意:
对于合流之后的水位线,以最小的watermark为准,才可以保证所有流都不会再传来水位线之前的数据
public class UnionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("192.168.0.23", 7777)
.map(data -> {
String[] field = data.split(",");
return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream1.print("stream1");
SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("192.168.0.23", 7778)
.map(data -> {
String[] field = data.split(",");
return new Event(field[0].trim(), field[1].trim(),
Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long
recordTimestamp) {
return element.timestamp;
}
})
);
stream2.print("stream2");
// 合并两条流
stream1.union(stream2)
.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
out.collect(" 水 位 线 : " + ctx.timerService().currentWatermark());
}
})
.print();
env.execute();
}
}
在合流之后的 ProcessFunction 对应的算子任务中,逻辑时钟的初始状态:
由于 Flink 会在流的开始处,插入一个负无穷大(Long.MIN_VALUE)的水位线,所以合流后的 ProcessFunction 对应的处理任务,会为合并的每条流保存一个分区水位线,初始值都是 Long.MIN_VALUE;而此时算子任务的水位线是所有分区水位线的最小值,因此也是Long.MIN_VALUE
2.2 连接(Connect)
2.2.1 连接流(ConnectedStreams)
DataStream 中的数据只能有唯一的类型,所以连接得到的并不是 DataStream,而是一个连接流(ConnectedStreams)连接流可以看成是两条流形式上的统一,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。需要进一步定义一个同处理(co-process)转换操作。两条流可以保持各自的数据类型、处理方式也可以不同,不过最终会统一到同一个 DataStream 中
public class CoMapExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> stream1 = env.fromElements(1, 2, 3);
DataStream<Long> stream2 = env.fromElements(1L, 2L, 3L);
ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
SingleOutputStreamOperator<String> map = connectedStreams.flatMap(new CoFlatMapFunction<Integer, Long, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) throws Exception {
out.collect("stream1:" + value);
}
@Override
public void flatMap2(Long value, Collector<String> out) throws Exception {
out.collect("stream2:" + value);
}
});
map.print();
env.execute();
}
}
ConnectedStreams 可直接调用.keyBy()进行按键分区的操作,传入两个参数 keySelector1和 keySelector2,是两条流中各自的键选择器,把两条流中 key 相同的数据放到一起(注意两条流定义的键的类型必须相同,否则会抛出异常)
connectedStreams.keyBy(keySelector1, keySelector2);
2.2.2 CoProcessFunction

使用CoProcessFunction方法,需要实现的就是 processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。CoProcessFunction 可以通过上下文 ctx 来访问 timestamp、水位线,并通过 TimerService 注册定时器;另外提供了.onTimer()方法,用于定义定时触发的处理操作
//实时对账,app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息
public class BillCheckExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 来自 app 的支付日志
SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env
.fromElements(Tuple3.of("order-1", "app", 1000L), Tuple3.of("order-2", "app", 2000L),Tuple3.of("order-3", "app", 9000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long>
element, long recordTimestamp) {
return element.f2;
}
})
);
// 来自第三方支付平台的支付日志
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env
.fromElements(Tuple4.of("order-1", "third-party", "success", 3000L), Tuple4.of("order-3", "third-party", "success", 4000L), Tuple4.of("order-4", "third-party", "success", 8000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
@Override
public long extractTimestamp(Tuple4<String, String, String, Long>
element, long recordTimestamp) {
return element.f3;
}
})
);
// 检测同一支付单在两条流中是否匹配,不匹配就报警
appStream.connect(thirdpartStream)
.keyBy(data -> data.f0, data -> data.f0)
.process(new OrderMatchResult())
.print();
env.execute();
}
// 自定义实现 CoProcessFunction
public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String> {
// 定义状态变量,用来保存已经到达的事件
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;
@Override
public void open(Configuration parameters) throws Exception {
appEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));
thirdPartyEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG)));
}
@Override
public void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
if (thirdPartyEventState.value() != null) {
out.collect(" 对 账 成 功 : " + value + " " + thirdPartyEventState.value());
// 清空状态
thirdPartyEventState.clear();
} else {
appEventState.update(value);
ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
}
}
@Override
public void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
if (appEventState.value() != null) {
out.collect("对账成功:" + appEventState.value() + " " + value);
// 清空状态
appEventState.clear();
} else {
// 更新状态
thirdPartyEventState.update(value);
// 注册一个 5 秒后的定时器,开始等待另一条流的事件
ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来
if (appEventState.value() != null) {
out.collect("对账失败:" + appEventState.value() + " " + "第三方支付平台信息未到");
}
if (thirdPartyEventState.value() != null) {
out.collect("对账失败:" + thirdPartyEventState.value() + " " + "app信息未到");
}
appEventState.clear();
thirdPartyEventState.clear();
}
}
}
2.2.3 广播连接流(BroadcastConnectedStream)
DataStream 调用.connect()方法时,传入的参数可以不是一个 DataStream,而是一个广播流(BroadcastStream),这时合并两条流得到的就变成了一个广播连接(BroadcastConnectedStream)
这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所以可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要广播(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,即广播状态(broadcast state)
广播状态底层是用一个映射(map)结构来保存的。在代码实现上,可以直接调用DataStream 的.broadcast()方法,传入一个映射状态描述器(MapStateDescriptor)说明状态的名称和类型,就可以得到规则数据的广播流(BroadcastStream)
MapStateDescriptor<String, Rule> ruleStateDescriptor = new
MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
DataStream<String> output = stream
.connect(ruleBroadcastStream)
.process( new BroadcastProcessFunction<>() {
...} );
BroadcastProcessFunction 与 CoProcessFunction 类似,同样是一个抽象类,需要实现两个方法,针对合并的两条流中元素分别定义处理操作。区别在于这里一条流是正常处理数据,而另一条流则是要用新规则来更新广播状态,所以对应的两个方法叫作.processElement()和.processBroadcastElement()
2.2 基于时间的合流——双流联结(Join)
边栏推荐
- ELK - Hearthbeat实现服务监控
- Web development model selection, who graduated from web development
- [第二章 基因和染色体的关系]生物知识概括–高一生物
- Problems encountered in installing mysql8 under centos7.x couldn't open file /etc/pki/rpm-gpg/rpm-gpg-key-mysql-2022
- 反射真的很耗时吗,反射 10 万次,耗时多久。
- Addition of large numbers (C language)
- Notes on brushing questions (13) -- binary tree: traversal of the first, middle and last order (review)
- Four ways to create threads
- Specflow环境搭建
- Maximum water container
猜你喜欢

Use of Chinese input method input event composition

微信web开发者,如何学习web开发
Use cache to reduce network requests

你管这破玩意儿叫 MQ?

ELK - ElastAlert最大的坑

纯数据业务的机器打电话进来时回落到了2G/3G

JS to realize the rotation chart (riding light). Pictures can be switched left and right. Moving the mouse will stop the rotation

. The way to prove the effect of throwing exceptions on performance in. Net core

iframe 传值

Let you understand bubble sorting (C language)
随机推荐
线程五种状态(线程生命周期)
Live app development to determine whether the user is logging in to the platform for the first time
Linux忘记MySQL密码后修改密码
Typescript compilation options and configuration files
Network protocol of yyds dry goods inventory: datagram socket for detailed explanation of socket protocol
arguments. Callee implement function recursive call
程序员常用的命令符
Elk - hearthbeat implements service monitoring
typescript 编译选项和配置文件
The wonderful use of XOR (C language)
Splunk 手工同步search head
Problems encountered in installing mysql8 under centos7.x couldn't open file /etc/pki/rpm-gpg/rpm-gpg-key-mysql-2022
SQLServer连接数据库(中文表)部分数据乱码问题解决
进度条加载
The role of Gerber file in PCB manufacturing
微信授权获取手机号码
一般运维架构图
JS 加法乘法错误解决 number-precision
反射真的很耗时吗,反射 10 万次,耗时多久。
C # apply many different fonts in PDF documents