当前位置:网站首页>CEP used by Flink
CEP used by Flink
2022-07-06 15:47:00 【Empty one by one】
Flink Use the table of contents to introduce relevant documents
Flink Use the table of contents to introduce relevant documents
What is? CEP
CEP The full name is Complex Event Processing, Complex event processing . It's still difficult to understand just by looking at the literal meaning . What is “ Complex events ”? Usually we use Flink When processing data streams , Just interested in every element that comes , Don't pay attention to the relationship between elements . Even if there is, it is only using stateful operators . Now there is a need , We need to focus on and capture a series of events with specific rules , For example, user login , transfer , And then quit (ABC Events happen continuously ), Or, for example, the computer room is continuous 10 Each temperature measurement is higher than 50 degree (A{10,}), We write in the traditional way Flink The procedure is more difficult . This is the time Flink CEP Show off your skill .
Now let's explain Flink CEP How to use
Introduce dependencies
Use Java Writing code requires the introduction of :
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.13.2</version>
</dependency>
among version
Corresponding Flink edition .
Use Scala We need to introduce :
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.13.2</version>
</dependency>
The following codes are written in Java Subject to .
Pattern API
Pattern That is, the user login mentioned in the first section , transfer , Then exit or the machine room continues 10 Each temperature measurement is higher than 50 degree . And regular expressions Pattern The concept is very similar .Pattern It is a programming interface that the user defines the characteristics of a continuous series of events .
Let's start with a simple example : Write a continuous 10 The temperature measurement of the machine room for more than times is higher than 50 Degree Pattern.
Pattern<Row, Row> pattern = Pattern.<Row>begin("high").where(new SimpleCondition<Row>() {
@Override
public boolean filter(Row row) throws Exception {
int temp = (int) row.getField("temperature");
return temp >= 50;
}
}).timesOrMore(10).consecutive();
First we call Pattern
Of begin
Method , Mark Pattern
The beginning of , Need to start for Pattern Specify a name . One Pattern It can be divided into several sections , Each paragraph has its own name , It is necessary to capture the elements matching the data flow later .begin
Methods need to specify generics , That is, the data type of the data source . Then followed by a where
Conditions .where
Method to receive a SimpleCondition
Inner class , Analyze user data and judge conditions , The qualified element returns true. Next, specify the number of elements that meet the condition , It's used here timesOrMore(n)
, It means n Time or n More than once . We noticed that there was another consecutive
, signify timesOrMore(n)
It has to be continuous , Other elements cannot be interspersed in the middle .
Let's take another user login , Example of withdrawal and withdrawal :
// Define user login events respectively , Withdrawal events and logout events
class UserEvent {
}
class LoginEvent extends UserEvent {
}
class WithdrawEvent extends UserEvent {
}
class LogoutEvent extends UserEvent {
}
// Here is Pattern
Pattern
.<UserEvent>begin("login")
.subtype(LoginEvent.class)
.next("withdraw")
.subtype(WithdrawEvent.class)
.timesOrMore(1)
.consecutive()
.next("logout")
.subtype(LogoutEvent.class);
What is defined here Pattern by AB+C, There are three events named "login","withdraw" and "logout". Use subtype
Specify the data type that meets the condition . That is, if it comes continuously 3 The elements are LoginEvent
,WithdrawEvent
and LogoutEvent
Example , this 3 Elements will be captured .
The example has been finished, everyone is right Pattern You should have a preliminary understanding of the use of . Let's start the analysis one by one Pattern API More detailed configuration .
Pattern Combination configuration
The definition of combined configuration is mainly as follows 3 A way :
- begin: Match the beginning of the template .
- next: Strictly match ,A next B It means A It must be followed closely B.
- followedBy: Non strict matching , Don't ask to follow closely , Other elements can be interspersed in the middle . for example A followedBy B Not only can it match A B, It can match A C B.C Failure to match will be ignored . Just ignore unmatched elements .
- followedByAny: Non strict matching , Than followedBy More relaxed , You can even ignore elements that can be matched , for example A followedByAny B To match A C B1 B2 It can be matched to A B1(C Be ignored ) and A B2( Even though B1 eligible , But it can also be ignored , This is a followedByAny and followedBy The difference ). If it is A followedBy B Can only match A B1.
- notNext:next The negative form of
- notFollowedBy:followedBy The negative form of , Be careful notFollowedBy Cannot be used in Pattern Ending .
Specify the number of repetitions
The number of repetitions can be specified 0 Time ,1 Time ,n Time ,n Time to m Time ,n More than times, etc . You can also limit this n Whether other events can be interspersed between times .
The configuration method is :
- times(n):n Time
- times(n, m):n Time to m Time
- optional(): Take this and allow it to appear 0 Time
- oneOrMore():1 Times or times
- timesOrMore(n):n Times or times
- greedy(): It means to match as many times as possible . For example, the incoming element is A X X B. among X Both meet the conditions A And meet the conditions B. about Pattern A+ B, It can be matched to A X,A X X,X X B,X B and A X X B. But for A (greedy)+ B,Flink Will match as many as possible A Conditions , So it will only match A X X B.
- consecutive(): requirement Pattern Must be continuous .
- allowCombinations(): and consecutive contrary , No continuity is required .
Specified conditions
- where(): The parameter is an internal class , Very similar to filter operator , You can write custom conditional expressions .
- subtype(): Specify the type of matching element .
Skip policy after matching
We often encounter situations where an element can be successfully matched many times . in application , How can an element be matched , This behavior can be specified by skipping the policy after matching .
The skip strategy after matching is as follows 5 Kind of :
- NO_SKIP: Don't skip , Match all possibilities .
- SKIP_TO_NEXT: The next matching starts from the next event of the first event in the matching successful event sequence .
- SKIP_PAST_LAST_EVENT: The next matching starts from the next event of the last event in the matching successful event sequence .
- SKIP_TO_FIRST(patternName): From the sequence of events that match successfully, the first one corresponds to patternName The next match starts with the event of .
- SKIP_TO_LAST(patternName): From the event sequence that matches successfully, the last one corresponds to patternName The next match starts with the event of .
The skip strategy after matching points out when defining the combination configuration :
Pattern.begin("patternName", skipStrategy);
skipStrategy Create... As follows :
AfterMatchSkipStrategy.noSkip();
AfterMatchSkipStrategy.skipToNext();
AfterMatchSkipStrategy.skipPastLastEvent();
AfterMatchSkipStrategy.skipToFirst(patternName);
AfterMatchSkipStrategy.skipToLast(patternName);
The time limit
In addition to data constraints ,Flink CEP It also supports specifying from the time dimension Pattern.
- within( Period of time ): These series of elements that meet the conditions must occur within a specified time period .
establish PatternStream
adopt CEP.pattern
Method , Register target objects DataStream
And the above chapter we created Pattern
.
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
// Or we specify comparator, Not very often
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
The second method is passed in Comparator You can customize the method of element comparison , Used as an element Event Time Judge the order at the same time . For example, we write a custom Row
Type comparison logic :
new EventComparator<Row>() {
@Override
public int compare(Row o1, Row o2) {
// Custom comparison logic is here
return 0;
}
};
It should be noted that ,Flink 1.12 After the version CEP Of PatternStream
By default Event Time. If business uses things Processing Time, The configuration must be clear .
PatternStream<Event> patternStream = CEP.pattern(input, pattern).inProcessingTime();
from PatternStream Select the captured element
select Method to receive a PatternSelectFunction
Type parameter , Users need to implement this interface , Write your own processing logic . The interface is shown below :
public interface PatternSelectFunction<IN, OUT> extends Function, Serializable {
OUT select(Map<String, List<IN>> var1) throws Exception;
}
The received parameter type is Map<String, List<IN>>
, among map Of key by Pattern Specified in the combination method pattern name ,value A set of matched elements . Pass the processed data through return
Return to .
You can also use process
function :
CEP.pattern(...).process(new PatternProcessFunction<IN, OUT>() {
@Override
public void processMatch(Map<String, List<IN>> map, Context context, Collector<OUT> collector) throws Exception {
// ...
}
});
Here the parameter types and PatternSelectFunction
similar , But the processed data cannot pass return
return , Need to use collector
collect .
about Pattern It can be matched to , But the timeout element ( Last chapter within
Configured time period ), By default, it will be discarded . If we need to capture the matching result of this timeout , You can use custom PatternProcessFunction
, Realization TimedOutPartialMatchHandler
. As shown below :
class CustomPatternProcessFunction extends PatternProcessFunction<Object, Object> implements TimedOutPartialMatchHandler<Object> {
@Override
public void processMatch(Map<String, List<Object>> map, Context context, Collector<Object> collector) throws Exception {
// ...
}
@Override
public void processTimedOutMatch(Map<String, List<Object>> map, Context context) throws Exception {
Object element = map.get("key").get(0);
context.output(outputTag, element);
}
}
The above example will time out element Put in bypass data , Bind to a outputTag On .OutputTag
Elements used to mark a set of bypass outputs . Here is the creation OutputTag And the method of getting bypass data elements :
// Object Element type output for bypass
OutputTag<Object> outputTag = new OutputTag<>("late-element");
// CEP operation ...
DataStream<Object> sideOutput = patternStream.getSideOutput(outputTag);
If you use event time Pattern , There must be elements of late . If we need to capture these elements , It can be the same as above , Use bypass output :
patternStream.sideOutputLateData(lateDataOutputTag)
The new edition changes
from Flink 1.12 Start ,CEP The default from the Processing Time Change it to Event Time. Be sure to pay attention to . For details, see Flink upgrade 1.12 Version of the pit .
Use SQL How to write CEP
Besides using Pattern API,Flink Also supports the use of SQL How to write CEP, Compared with SQL More flexible , But you need to learn SQL match_recognize Syntax of clause .SQL How to write CEP See Flink Use it CEP(SQL The way ).
link :http://events.jianshu.io/p/a3931e203324
边栏推荐
猜你喜欢
Learning records: serial communication and solutions to errors encountered
mysql导入数据库报错 [Err] 1273 – Unknown collation: ‘utf8mb4_0900_ai_ci’
学习记录:串口通信和遇到的错误解决方法
Learning record: use stm32f1 watchdog
学习记录:STM32F103 时钟系统概述工作原理
STM32 learning record: LED light flashes (register version)
STM32学习记录:LED灯闪烁(寄存器版)
C语言数组的概念
Optimization method of path problem before dynamic planning
信息安全-威胁检测引擎-常见规则引擎底座性能比较
随机推荐
STM32学习记录:玩转按键控制蜂鸣器和LED
学习记录:如何进行PWM 输出
Ball Dropping
洛谷P1102 A-B数对(二分,map,双指针)
Learning records: serial communication and solutions to errors encountered
【练习-3】(Uva 442)Matrix Chain Multiplication(矩阵链乘)
Perinatal Software Industry Research Report - market status analysis and development prospect forecast
Cost accounting [13]
China's earthwork equipment market trend report, technical dynamic innovation and market forecast
mysql导入数据库报错 [Err] 1273 – Unknown collation: ‘utf8mb4_0900_ai_ci’
Cost accounting [20]
JS调用摄像头
学习记录:理解 SysTick系统定时器,编写延时函数
Optimization method of path problem before dynamic planning
Research Report on medical anesthesia machine industry - market status analysis and development prospect prediction
【练习-1】(Uva 673) Parentheses Balance/平衡的括号 (栈stack)
Accounting regulations and professional ethics [2]
C语言数组的概念
Accounting regulations and professional ethics [4]
Cost accounting [17]