当前位置:网站首页>CEP used by Flink
CEP used by Flink
2022-07-06 15:47:00 【Empty one by one】
Flink Use the table of contents to introduce relevant documents
Flink Use the table of contents to introduce relevant documents
What is? CEP
CEP The full name is Complex Event Processing, Complex event processing . It's still difficult to understand just by looking at the literal meaning . What is “ Complex events ”? Usually we use Flink When processing data streams , Just interested in every element that comes , Don't pay attention to the relationship between elements . Even if there is, it is only using stateful operators . Now there is a need , We need to focus on and capture a series of events with specific rules , For example, user login , transfer , And then quit (ABC Events happen continuously ), Or, for example, the computer room is continuous 10 Each temperature measurement is higher than 50 degree (A{10,}), We write in the traditional way Flink The procedure is more difficult . This is the time Flink CEP Show off your skill .
Now let's explain Flink CEP How to use
Introduce dependencies
Use Java Writing code requires the introduction of :
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.13.2</version>
</dependency>
among version
Corresponding Flink edition .
Use Scala We need to introduce :
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.13.2</version>
</dependency>
The following codes are written in Java Subject to .
Pattern API
Pattern That is, the user login mentioned in the first section , transfer , Then exit or the machine room continues 10 Each temperature measurement is higher than 50 degree . And regular expressions Pattern The concept is very similar .Pattern It is a programming interface that the user defines the characteristics of a continuous series of events .
Let's start with a simple example : Write a continuous 10 The temperature measurement of the machine room for more than times is higher than 50 Degree Pattern.
Pattern<Row, Row> pattern = Pattern.<Row>begin("high").where(new SimpleCondition<Row>() {
@Override
public boolean filter(Row row) throws Exception {
int temp = (int) row.getField("temperature");
return temp >= 50;
}
}).timesOrMore(10).consecutive();
First we call Pattern
Of begin
Method , Mark Pattern
The beginning of , Need to start for Pattern Specify a name . One Pattern It can be divided into several sections , Each paragraph has its own name , It is necessary to capture the elements matching the data flow later .begin
Methods need to specify generics , That is, the data type of the data source . Then followed by a where
Conditions .where
Method to receive a SimpleCondition
Inner class , Analyze user data and judge conditions , The qualified element returns true. Next, specify the number of elements that meet the condition , It's used here timesOrMore(n)
, It means n Time or n More than once . We noticed that there was another consecutive
, signify timesOrMore(n)
It has to be continuous , Other elements cannot be interspersed in the middle .
Let's take another user login , Example of withdrawal and withdrawal :
// Define user login events respectively , Withdrawal events and logout events
class UserEvent {
}
class LoginEvent extends UserEvent {
}
class WithdrawEvent extends UserEvent {
}
class LogoutEvent extends UserEvent {
}
// Here is Pattern
Pattern
.<UserEvent>begin("login")
.subtype(LoginEvent.class)
.next("withdraw")
.subtype(WithdrawEvent.class)
.timesOrMore(1)
.consecutive()
.next("logout")
.subtype(LogoutEvent.class);
What is defined here Pattern by AB+C, There are three events named "login","withdraw" and "logout". Use subtype
Specify the data type that meets the condition . That is, if it comes continuously 3 The elements are LoginEvent
,WithdrawEvent
and LogoutEvent
Example , this 3 Elements will be captured .
The example has been finished, everyone is right Pattern You should have a preliminary understanding of the use of . Let's start the analysis one by one Pattern API More detailed configuration .
Pattern Combination configuration
The definition of combined configuration is mainly as follows 3 A way :
- begin: Match the beginning of the template .
- next: Strictly match ,A next B It means A It must be followed closely B.
- followedBy: Non strict matching , Don't ask to follow closely , Other elements can be interspersed in the middle . for example A followedBy B Not only can it match A B, It can match A C B.C Failure to match will be ignored . Just ignore unmatched elements .
- followedByAny: Non strict matching , Than followedBy More relaxed , You can even ignore elements that can be matched , for example A followedByAny B To match A C B1 B2 It can be matched to A B1(C Be ignored ) and A B2( Even though B1 eligible , But it can also be ignored , This is a followedByAny and followedBy The difference ). If it is A followedBy B Can only match A B1.
- notNext:next The negative form of
- notFollowedBy:followedBy The negative form of , Be careful notFollowedBy Cannot be used in Pattern Ending .
Specify the number of repetitions
The number of repetitions can be specified 0 Time ,1 Time ,n Time ,n Time to m Time ,n More than times, etc . You can also limit this n Whether other events can be interspersed between times .
The configuration method is :
- times(n):n Time
- times(n, m):n Time to m Time
- optional(): Take this and allow it to appear 0 Time
- oneOrMore():1 Times or times
- timesOrMore(n):n Times or times
- greedy(): It means to match as many times as possible . For example, the incoming element is A X X B. among X Both meet the conditions A And meet the conditions B. about Pattern A+ B, It can be matched to A X,A X X,X X B,X B and A X X B. But for A (greedy)+ B,Flink Will match as many as possible A Conditions , So it will only match A X X B.
- consecutive(): requirement Pattern Must be continuous .
- allowCombinations(): and consecutive contrary , No continuity is required .
Specified conditions
- where(): The parameter is an internal class , Very similar to filter operator , You can write custom conditional expressions .
- subtype(): Specify the type of matching element .
Skip policy after matching
We often encounter situations where an element can be successfully matched many times . in application , How can an element be matched , This behavior can be specified by skipping the policy after matching .
The skip strategy after matching is as follows 5 Kind of :
- NO_SKIP: Don't skip , Match all possibilities .
- SKIP_TO_NEXT: The next matching starts from the next event of the first event in the matching successful event sequence .
- SKIP_PAST_LAST_EVENT: The next matching starts from the next event of the last event in the matching successful event sequence .
- SKIP_TO_FIRST(patternName): From the sequence of events that match successfully, the first one corresponds to patternName The next match starts with the event of .
- SKIP_TO_LAST(patternName): From the event sequence that matches successfully, the last one corresponds to patternName The next match starts with the event of .
The skip strategy after matching points out when defining the combination configuration :
Pattern.begin("patternName", skipStrategy);
skipStrategy Create... As follows :
AfterMatchSkipStrategy.noSkip();
AfterMatchSkipStrategy.skipToNext();
AfterMatchSkipStrategy.skipPastLastEvent();
AfterMatchSkipStrategy.skipToFirst(patternName);
AfterMatchSkipStrategy.skipToLast(patternName);
The time limit
In addition to data constraints ,Flink CEP It also supports specifying from the time dimension Pattern.
- within( Period of time ): These series of elements that meet the conditions must occur within a specified time period .
establish PatternStream
adopt CEP.pattern
Method , Register target objects DataStream
And the above chapter we created Pattern
.
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
// Or we specify comparator, Not very often
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
The second method is passed in Comparator You can customize the method of element comparison , Used as an element Event Time Judge the order at the same time . For example, we write a custom Row
Type comparison logic :
new EventComparator<Row>() {
@Override
public int compare(Row o1, Row o2) {
// Custom comparison logic is here
return 0;
}
};
It should be noted that ,Flink 1.12 After the version CEP Of PatternStream
By default Event Time. If business uses things Processing Time, The configuration must be clear .
PatternStream<Event> patternStream = CEP.pattern(input, pattern).inProcessingTime();
from PatternStream Select the captured element
select Method to receive a PatternSelectFunction
Type parameter , Users need to implement this interface , Write your own processing logic . The interface is shown below :
public interface PatternSelectFunction<IN, OUT> extends Function, Serializable {
OUT select(Map<String, List<IN>> var1) throws Exception;
}
The received parameter type is Map<String, List<IN>>
, among map Of key by Pattern Specified in the combination method pattern name ,value A set of matched elements . Pass the processed data through return
Return to .
You can also use process
function :
CEP.pattern(...).process(new PatternProcessFunction<IN, OUT>() {
@Override
public void processMatch(Map<String, List<IN>> map, Context context, Collector<OUT> collector) throws Exception {
// ...
}
});
Here the parameter types and PatternSelectFunction
similar , But the processed data cannot pass return
return , Need to use collector
collect .
about Pattern It can be matched to , But the timeout element ( Last chapter within
Configured time period ), By default, it will be discarded . If we need to capture the matching result of this timeout , You can use custom PatternProcessFunction
, Realization TimedOutPartialMatchHandler
. As shown below :
class CustomPatternProcessFunction extends PatternProcessFunction<Object, Object> implements TimedOutPartialMatchHandler<Object> {
@Override
public void processMatch(Map<String, List<Object>> map, Context context, Collector<Object> collector) throws Exception {
// ...
}
@Override
public void processTimedOutMatch(Map<String, List<Object>> map, Context context) throws Exception {
Object element = map.get("key").get(0);
context.output(outputTag, element);
}
}
The above example will time out element Put in bypass data , Bind to a outputTag On .OutputTag
Elements used to mark a set of bypass outputs . Here is the creation OutputTag And the method of getting bypass data elements :
// Object Element type output for bypass
OutputTag<Object> outputTag = new OutputTag<>("late-element");
// CEP operation ...
DataStream<Object> sideOutput = patternStream.getSideOutput(outputTag);
If you use event time Pattern , There must be elements of late . If we need to capture these elements , It can be the same as above , Use bypass output :
patternStream.sideOutputLateData(lateDataOutputTag)
The new edition changes
from Flink 1.12 Start ,CEP The default from the Processing Time Change it to Event Time. Be sure to pay attention to . For details, see Flink upgrade 1.12 Version of the pit .
Use SQL How to write CEP
Besides using Pattern API,Flink Also supports the use of SQL How to write CEP, Compared with SQL More flexible , But you need to learn SQL match_recognize Syntax of clause .SQL How to write CEP See Flink Use it CEP(SQL The way ).
link :http://events.jianshu.io/p/a3931e203324
边栏推荐
- Opencv learning log 13 corrosion, expansion, opening and closing operations
- STM32如何使用STLINK下载程序:点亮LED跑马灯(库版本)
- Nodejs+vue网上鲜花店销售信息系统express+mysql
- F - Birthday Cake(山东省赛)
- Cost accounting [23]
- 学习记录:如何进行PWM 输出
- 用C语言写网页游戏
- Research Report on pharmaceutical R & D outsourcing service industry - market status analysis and development prospect forecast
- MATLAB实例:阶跃函数的两种表达方式
- Research Report of peripheral venous catheter (pivc) industry - market status analysis and development prospect prediction
猜你喜欢
随机推荐
Opencv learning log 14 - count the number of coins in the picture (regardless of overlap)
Research Report on printed circuit board (PCB) connector industry - market status analysis and development prospect forecast
7-1 懂的都懂 (20 分)
HDU-6025-Coprime Sequence(女生赛)
Opencv learning log 30 -- histogram equalization
Matlab example: two expressions of step function
Cost accounting [21]
Research Report of cylindrical grinder industry - market status analysis and development prospect forecast
MySQL授予用户指定内容的操作权限
VS2019初步使用
【练习-9】Zombie’s Treasure Chest
Borg Maze (BFS+最小生成树)(解题报告)
China's peripheral catheter market trend report, technological innovation and market forecast
Learning record: Tim - capacitive key detection
China's salt water membrane market trend report, technological innovation and market forecast
【练习-11】4 Values whose Sum is 0(和为0的4个值)
0 - 1 problème de sac à dos (1)
Cost accounting [17]
Cost accounting [13]
对iptables进行常规操作