当前位置:网站首页>Flink learning notes (VIII) multi stream conversion
Flink learning notes (VIII) multi stream conversion
2022-07-03 09:34:00 【Did Xiao Hu get stronger today】
List of articles
8. Multi stream conversion
Whether it's basic simple transformation or aggregation , Or window based computing , We all deal with the data on one stream . In practice , You may need to combine data connections from different sources for processing , It may also be necessary to One stream is separated , Therefore, there are often scenes of processing multiple streams .
8.1 shunt
So-called “ shunt ”, Is to split a data stream into two completely independent 、 Even multiple streams .
stay Flink 1.13 In the version , It has been abandoned .split() Method , Instead, use the handler function directly (process function) Side output stream (side output). Marking and extraction of side output stream , all You can't do without one “ Output label ”(OutputTag), It's equivalent to split() When shunting “ stamp ”, Specifies the of the side output stream id And type .
8.2 Confluence
8.2.1 union (Union)
The simplest confluence operation , Is to directly combine multiple streams , Called flow “ union ”(union), The union operation requires that the data types in the stream must be the same , The merged new flow will include all the elements in the flow , The data type remains unchanged .
In the code , We just have to be based on DataStream Call directly .union() Method , Introduction to others DataStream As a reference Count , The union of flows can be realized ; What we get is still a DataStream:
stream1.union(stream2, stream3, ...)
In the event time semantics , The water mark is the progress sign of time ; Different streams may The progress of the water line is completely different , If they merge together , Which water level shall prevail ? Consider the essential meaning of the water mark , yes “ All the previous data have arrived ”; So for the confluence Waterline , The smallest one shall prevail , In this way, we can ensure that all streams will not transmit the previous data . That is, the timeliness of processing when merging multiple streams is based on the slowest stream .
8.2.2 Connect (Connect)
1. Connection flow (ConnectedStreams)
In order to be more flexible , The connection operation allows different data types of streams . But a DataStream Medium Data can only have unique types , So the connection is not DataStream, It is a “ Connection flow ” (ConnectedStreams). The connection flow can be regarded as two flows in the form of “ Unified ”, Put in the same stream ; In fact, the internal data form remains unchanged , They are independent of each other . To get new DataStream, A further definition is needed “ Deal with ”(co-process) Conversion operation , Used to illustrate that for different sources 、 Different types of The data of , How to process and convert separately 、 Get a unified output type . So on the whole , The connection of two streams is like “ "One country, two systems" ”, Two streams can maintain their own data types 、 The treatment can also be different , However, it will eventually be unified to The same DataStream in .
mport org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
public class ConnectTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> stream1 = env.fromElements(1,2,3);
DataStream<Long> stream2 = env.fromElements(1L,2L,3L);
ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {
@Override
public String map1(Integer value) {
return "Integer: " + value;
}
@Override
public String map2(Long value) {
return "Long: " + value;
}
});
result.print();
env.execute();
}
}
The connection of two streams (connect), With the United (union) Operation comparison , The biggest advantage is that it can handle different types Merge of flows , More flexible use 、 More widely used . Of course, it also has limitations , That is, the number of merged streams can only be 2, and union Multiple streams can be merged at the same time . It's also very easy to understand :union Restricted type invariance , Therefore, direct merger does not problem ; and connect yes “ "One country, two systems" ”, The interface for subsequent processing only defines two conversion methods , If the extension needs to be repeated Newly defined interface , So we can't “ One country with multiple systems ”.
2. CoProcessFunction
For connection flow ConnectedStreams Processing operation of , You need to define the processing and transformation of the two streams separately , So interface There will be two identical methods to be implemented , Use numbers “1”“2” distinguish , When the data in the two streams arrives, they are adjusted respectively use . We call this interface “ Collaborative processing function ”(co-process function). And CoMapFunction similar , Such as The result is to call .flatMap() You need to pass in a CoFlatMapFunction, Need to achieve flatMap1()、flatMap2() Two Method ; And call .process() when , What comes in is a CoProcessFunction.
abstract class CoProcessFunction In the source code, it is defined as follows :
public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
...
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
}
public abstract class Context {
...}
...
}
CoProcessFunction It's also “ Processing function ” A member of the family , The usage is very similar like . What it needs to achieve is processElement1()、processElement2() Two methods , When every data comes , One of the methods will be called according to the source stream for processing .CoProcessFunction You can also use context ctx Come on visit timestamp、 Waterline , And pass TimerService Register timer ; It also provides .onTimer() Method , use It is used to define the processing operations triggered by timing .
3. Broadcast connection flow (BroadcastConnectedStream)
This connection method is often used in scenarios where some rules or configurations need to be dynamically defined . Because the rules change in real time , the So we can use a separate stream to get rule data ; These rules or configurations are globally valid for the entire application , the So you can't just pass this data to a downstream parallel subtask for processing , But to “ radio broadcast ”(broadcast) Give all and Line subtask . And the downstream task receives the broadcast rules , Will save it in a state , That's what's called “ radio broadcast state ”(broadcast state).
The bottom layer of broadcast state is a “ mapping ”(map) Structure to preserve . In code implementation , Can be called directly DataStream Of .broadcast() Method , Pass in a “ Mapping state descriptor ”(MapStateDescriptor) Description status The name and type of , You can get the rule data “ Broadcast stream ”(BroadcastStream):
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
Next, you can process the data flow , Connect to this broadcast stream (connect), What you get is It means “ Broadcast connection flow ”(BroadcastConnectedStream). be based on BroadcastConnectedStream call .process() Method , You can get rules and data at the same time , Dynamic processing .
DataStream<String> output = stream
.connect(ruleBroadcastStream)
.process( new BroadcastProcessFunction<>() {
...} );
BroadcastProcessFunction And CoProcessFunction similar , It is also an abstract class , Need to implement two The method is called .processElement() and .processBroadcastElement(). The source code is defined as follows :
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends
BaseBroadcastProcessFunction {
...
public abstract void processElement(IN1 value, ReadOnlyContext ctx,
Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx,
Collector<OUT> out) throws Exception;
...
}
8.3 Time based confluence
For the merging of two streams , In many cases, we don't simply put all the data together , But hope according to some The values of the fields join them ,“ pairing ” To do the processing . This requirement is related to the of tables in relational databases join The operation is very similar .
8.3.1 Window connection (Window Join)
Window connection (window join) operator , You can define the time window , and Share a common key between the two streams (key) Put the data in the window for pairing processing .
- Call of window connection
Implementation of window connection in code , First you need to call DataStream Of .join() Method to merge two streams , Get one individual JoinedStreams; Then passed .where() and .equalTo() Method to specify the join in two streams key; Then pass too .window() Open the window , And call .apply() Pass in the join window function for processing and calculation .
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
Incoming JoinFunction It is also a function class interface , When using, you need to implement internal .join() Method . This method There are two parameters , Respectively represent the paired matching data in the two streams .JoinFunction The definition in the source code is as follows :
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT join(IN1 first, IN2 second) throws Exception;
}
- Processing flow of window connection
8.3.2 Interval connection (Interval Join)
- The principle of interval connection
The idea of interval connection is for each data of a stream , Open up a time interval before and after its timestamp , See if there is data matching from another stream during this period .
The matching condition is : a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound , We need to pay attention to , Two streams connected at intervals A and B, Must also be based on the same key; Lower bound lowerBound Should be less than or equal to the upper bound upperBound, Both can be positive or negative ; Interval join currently only supports event time semantics .
- Call of interval connection
stream1
.keyBy(<KeySelector>)
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx,
Collector<String> out) {
out.collect(left + "," + right);
}
});
8.3.3 The window is connected with the group (Window CoGroup)
Window CoGroup Usage and follow up window join Very similar , After merging the two streams, the matching elements are processed by windowing , Invocation time Only need to .join() Replace with .coGroup().
stream1.coGroup(stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(<CoGroupFunction>)
summary
Multi stream conversion is a common requirement of stream processing in practical applications , It mainly includes diversion and confluence . The most basic way to merge is to combine (union) And connections (connect), The main difference between the two is union It can be done to Merge multiple streams , Data types must be consistent ; and connect Only two streams can be connected , Data types can be different .Flink Several built-in connections are also provided (join) operation , They are both based on a certain period of time Merge , It is the high-level after demand specialization API. It mainly includes window connection (window join)、 Interval connection (interval join) Join with window group (window coGroup). among window join and coGroup It's all based on the time window .
边栏推荐
- Go language - IO project
- Jenkins learning (III) -- setting scheduled tasks
- Common software open source protocols
- QT qstring:: number apply base conversion
- Hudi 集成 Spark 数据分析示例(含代码流程与测试结果)
- 基于opencv实现桌面图标识别
- [kotlin puzzle] what happens if you overload an arithmetic operator in the kotlin class and declare the operator as an extension function?
- 1922. Count Good Numbers
- Win10 quick screenshot
- IDEA 中使用 Hudi
猜你喜欢
Directory and switching operation in file system
LeetCode每日一题(931. Minimum Falling Path Sum)
Temper cattle ranking problem
[kotlin puzzle] what happens if you overload an arithmetic operator in the kotlin class and declare the operator as an extension function?
Solve editor MD uploads pictures and cannot get the picture address
PolyWorks script development learning notes (I) - script development environment
Using Hudi in idea
Windows安装Redis详细步骤
Spark structured stream writing Hudi practice
IDEA 中使用 Hudi
随机推荐
Idea uses the MVN command to package and report an error, which is not available
Leetcode daily question (968. binary tree cameras)
Install database -linux-5.7
LeetCode每日一题(1362. Closest Divisors)
Failed building wheel for argon2 cffi when installing Jupiter
Flink学习笔记(十一)Table API 和 SQL
Jetson Nano 自定义启动图标kernel Logo cboot logo
Learning C language from scratch -- installation and configuration of 01 MinGW
Go language - Reflection
npm install安装依赖包报错解决方法
Common software open source protocols
[set theory] order relation (chain | anti chain | chain and anti chain example | chain and anti chain theorem | chain and anti chain inference | good order relation)
Hudi integrated spark data analysis example (including code flow and test results)
About the configuration of vs2008+rade CATIA v5r22
What do software test engineers do? Pass the technology to test whether there are loopholes in the software program
Trial of the combination of RDS and crawler
Leetcode daily question (2232. minimize result by addressing parents to expression)
Filter comments to filter out uncommented and default values
Leetcode daily question (745. prefix and suffix search)
Navicat, MySQL export Er graph, er graph