当前位置:网站首页>Flink multi stream conversion (side output stream shunting, union, connect) real-time reconciliation of APP payment operations and third-party payment operations
Flink multi stream conversion (side output stream shunting, union, connect) real-time reconciliation of APP payment operations and third-party payment operations
2022-06-11 12:11:00 【But don't ask about your future】
List of articles
Preface
1. shunt
Split a data stream into two completely independent streams 、 Even multiple streams , Direct use handler (process function) Side output stream (side output) that will do 
public class SplitStreamByOutputTag {
private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv") {
};
private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv") {
};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());
SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
if (value.user.equals("Mary")) {
ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));
} else if (value.user.equals("Bob")) {
ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));
} else {
out.collect(value);
}
}
});
processedStream.getSideOutput(MaryTag).print("Mary pv");
processedStream.getSideOutput(BobTag).print("Bob pv");
processedStream.print("else");
env.execute();
}
}
2. Confluence
2.1 union (Union)
Joint operation requires that The data types in the stream must be the same , The merged new flow will include all the elements in the flow , The data type remains unchanged 
stream1.union(stream2, stream3, ...)
Be careful :
For the water level after confluence , With the smallest watermark Subject to , To ensure that all streams will not send data before the water level
public class UnionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("192.168.0.23", 7777)
.map(data -> {
String[] field = data.split(",");
return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream1.print("stream1");
SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("192.168.0.23", 7778)
.map(data -> {
String[] field = data.split(",");
return new Event(field[0].trim(), field[1].trim(),
Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long
recordTimestamp) {
return element.timestamp;
}
})
);
stream2.print("stream2");
// Merge two streams
stream1.union(stream2)
.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
out.collect(" water position Line : " + ctx.timerService().currentWatermark());
}
})
.print();
env.execute();
}
}
After the confluence ProcessFunction In the corresponding operator task , The initial state of the logical clock :
because Flink At the beginning of the stream , Insert a negative infinity (Long.MIN_VALUE) The water level line of , So after the confluence ProcessFunction Corresponding processing tasks , Meeting Save a zone watermark for each merged flow , The initial value is zero Long.MIN_VALUE; At this time, the water level of the operator task is the minimum value of the water level of all partitions , So it's also Long.MIN_VALUE
2.2 Connect (Connect)
2.2.1 Connection flow (ConnectedStreams)
DataStream Data in can only have a unique type , So the connection is not DataStream, It's a connection flow (ConnectedStreams) The connection flow can be regarded as the unity of the two flows , Put in the same stream ; In fact, the internal data form remains unchanged , They are independent of each other . It is necessary to further define a same process (co-process) Conversion operation . Two streams can maintain their own data types 、 The treatment can also be different , But it will eventually be unified into the same DataStream in 
public class CoMapExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> stream1 = env.fromElements(1, 2, 3);
DataStream<Long> stream2 = env.fromElements(1L, 2L, 3L);
ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
SingleOutputStreamOperator<String> map = connectedStreams.flatMap(new CoFlatMapFunction<Integer, Long, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) throws Exception {
out.collect("stream1:" + value);
}
@Override
public void flatMap2(Long value, Collector<String> out) throws Exception {
out.collect("stream2:" + value);
}
});
map.print();
env.execute();
}
}
ConnectedStreams Directly callable .keyBy() Operate the key partition , Pass in two parameters keySelector1 and keySelector2, Are the key selectors in each of the two streams , Put the two streams key Put the same data together ( Be careful The types of keys defined by two streams must be the same , Otherwise, an exception will be thrown )
connectedStreams.keyBy(keySelector1, keySelector2);
2.2.2 CoProcessFunction

Use CoProcessFunction Method , What needs to be achieved is processElement1()、processElement2() Two methods , When every data comes , One of the methods will be called according to the source stream for processing .CoProcessFunction You can use context ctx To visit timestamp、 Waterline , And pass TimerService Register timer ; In addition, it provides .onTimer() Method , It is used to define the processing operation of timing trigger
// Real-time reconciliation ,app A dual stream of payment operations and third-party payment operations Join.App The payment event of and the payment event of the third party will wait for each other 5 Second , If you can't wait for the corresponding payment event , Then output the alarm information
public class BillCheckExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// come from app Payment log
SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env
.fromElements(Tuple3.of("order-1", "app", 1000L), Tuple3.of("order-2", "app", 2000L),Tuple3.of("order-3", "app", 9000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long>
element, long recordTimestamp) {
return element.f2;
}
})
);
// Payment logs from third-party payment platforms
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env
.fromElements(Tuple4.of("order-1", "third-party", "success", 3000L), Tuple4.of("order-3", "third-party", "success", 4000L), Tuple4.of("order-4", "third-party", "success", 8000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
@Override
public long extractTimestamp(Tuple4<String, String, String, Long>
element, long recordTimestamp) {
return element.f3;
}
})
);
// Check whether the same payment document matches in the two streams , If it doesn't match, call the police
appStream.connect(thirdpartStream)
.keyBy(data -> data.f0, data -> data.f0)
.process(new OrderMatchResult())
.print();
env.execute();
}
// Custom implementation CoProcessFunction
public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String> {
// Define state variables , Used to save events that have arrived
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;
@Override
public void open(Configuration parameters) throws Exception {
appEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));
thirdPartyEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG)));
}
@Override
public void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
if (thirdPartyEventState.value() != null) {
out.collect(" Yes zhang become work : " + value + " " + thirdPartyEventState.value());
// Empty state
thirdPartyEventState.clear();
} else {
appEventState.update(value);
ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
}
}
@Override
public void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
if (appEventState.value() != null) {
out.collect(" Reconciliation successful :" + appEventState.value() + " " + value);
// Empty state
appEventState.clear();
} else {
// Update status
thirdPartyEventState.update(value);
// Sign up for a 5 The timer in seconds , Start waiting for another stream of events
ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// Timer triggered , Judge the state , If a state is not empty , It means that the event in another stream did not come
if (appEventState.value() != null) {
out.collect(" Reconciliation failed :" + appEventState.value() + " " + " The information of the third-party payment platform has not arrived ");
}
if (thirdPartyEventState.value() != null) {
out.collect(" Reconciliation failed :" + thirdPartyEventState.value() + " " + "app The information did not arrive ");
}
appEventState.clear();
thirdPartyEventState.clear();
}
}
}
2.2.3 Broadcast connection flow (BroadcastConnectedStream)
DataStream call .connect() When the method is used , The parameter passed in may not be a DataStream, It is a Broadcast stream (BroadcastStream), At this point, the result of merging the two streams becomes a broadcast connection (BroadcastConnectedStream)
This connection method is often used in scenarios where some rules or configurations need to be dynamically defined . Because the rules change in real time , So you can use a separate stream to get the rule data ; These rules or configurations are globally valid for the entire application , So you can't just pass this data to a downstream parallel subtask , But to broadcast (broadcast) Give all parallel subtasks . And the downstream task receives the broadcast rules , Will save it in a state , namely Broadcast status (broadcast state)
The bottom layer of broadcast state is a mapping (map) Structure to preserve . In code implementation , Can be called directly DataStream Of .broadcast() Method , Pass in a Mapping state descriptor (MapStateDescriptor) Describes the name and type of the status , You can get the broadcast stream of rule data (BroadcastStream)
MapStateDescriptor<String, Rule> ruleStateDescriptor = new
MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
DataStream<String> output = stream
.connect(ruleBroadcastStream)
.process( new BroadcastProcessFunction<>() {
...} );
BroadcastProcessFunction And CoProcessFunction similar , It is also an abstract class , Two methods need to be implemented , Define processing operations for the elements in the merged two streams . The difference is that a stream here normally processes data , The other stream is to update the broadcast status with new rules , So the corresponding two methods are called .processElement() and .processBroadcastElement()
2.2 Time based confluence —— Double flow connection (Join)
边栏推荐
- Golang uses XOR ^ to exchange two variables and encrypt / decrypt them
- 安全工程师发现PS主机重大漏洞 用光盘能在系统中执行任意代码
- flink Spark 和 Flink对比
- 程序员常用的命令符
- Live app source code, and the status bar and navigation bar are set to transparent status
- Apple mobileone: the mobile terminal only needs 1ms of high-performance backbone
- 大数相加(C语言)
- When the security engineer finds a major vulnerability in the PS host, the CD can execute arbitrary code in the system
- C # apply many different fonts in PDF documents
- Is reflection really time-consuming? How long does it take to reflect 100000 times.
猜你喜欢

Is reflection really time-consuming? How long does it take to reflect 100000 times.

Notes on topic brushing (XIV) -- binary tree: sequence traversal and DFS, BFS

Jest unit test description config json

让你理解选择排序(C语言)

01_ Description object_ Class diagram

Hang up the interviewer

JEST 单元测试说明 config.json

Let you understand selection sorting (C language)

When the security engineer finds a major vulnerability in the PS host, the CD can execute arbitrary code in the system

Error occurred when MySQL imported the database data in pagoda as 0000-00-00 and enum as null
随机推荐
Problems encountered in installing mysql8 under centos7.x couldn't open file /etc/pki/rpm-gpg/rpm-gpg-key-mysql-2022
一般运维架构图
CVPR 2022 𞓜 text guided entity level image manipulation manitrans
Use compiler option ‘--downlevelIteration‘ to allow iterating of iterators 报错解决
Zhouhongyi's speech at the China Network Security Annual Conference: 360 secure brain builds a data security system
Secret derrière le seau splunk
Let you understand selection sorting (C language)
. The way to prove the effect of throwing exceptions on performance in. Net core
Addition of large numbers (C language)
When I saw the sudden death of a 28 year old employee, I wanted to moisten
ftp服务器:serv-u 的下载及使用
JVM优化
阶乘后的零(C语言)
【LeetCode】494. Objective and (2 wrong questions)
The wonderful use of XOR (C language)
Zero after factorial (C language)
PS does not display text cursor, text box, and does not highlight after selection
Apple mobileone: the mobile terminal only needs 1ms of high-performance backbone
Yapi installation
大数相加(C语言)