当前位置:网站首页>CEP used by Flink

CEP used by Flink

2022-07-06 15:47:00 Empty one by one

Flink Use the table of contents to introduce relevant documents

Flink Use the table of contents to introduce relevant documents

What is? CEP

CEP The full name is Complex Event Processing, Complex event processing . It's still difficult to understand just by looking at the literal meaning . What is “ Complex events ”? Usually we use Flink When processing data streams , Just interested in every element that comes , Don't pay attention to the relationship between elements . Even if there is, it is only using stateful operators . Now there is a need , We need to focus on and capture a series of events with specific rules , For example, user login , transfer , And then quit (ABC Events happen continuously ), Or, for example, the computer room is continuous 10 Each temperature measurement is higher than 50 degree (A{10,}), We write in the traditional way Flink The procedure is more difficult . This is the time Flink CEP Show off your skill .

Now let's explain Flink CEP How to use

Introduce dependencies

Use Java Writing code requires the introduction of :

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.11</artifactId>
    <version>1.13.2</version>
</dependency>

among version Corresponding Flink edition .

Use Scala We need to introduce :

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep-scala_2.11</artifactId>
    <version>1.13.2</version>
</dependency>

The following codes are written in Java Subject to .

Pattern API

Pattern That is, the user login mentioned in the first section , transfer , Then exit or the machine room continues 10 Each temperature measurement is higher than 50 degree . And regular expressions Pattern The concept is very similar .Pattern It is a programming interface that the user defines the characteristics of a continuous series of events .

Let's start with a simple example : Write a continuous 10 The temperature measurement of the machine room for more than times is higher than 50 Degree Pattern.

Pattern<Row, Row> pattern = Pattern.<Row>begin("high").where(new SimpleCondition<Row>() {
    @Override
    public boolean filter(Row row) throws Exception {
        int temp = (int) row.getField("temperature");
        return temp >= 50;
    }
}).timesOrMore(10).consecutive();

First we call Pattern Of begin Method , Mark Pattern The beginning of , Need to start for Pattern Specify a name . One Pattern It can be divided into several sections , Each paragraph has its own name , It is necessary to capture the elements matching the data flow later .begin Methods need to specify generics , That is, the data type of the data source . Then followed by a where Conditions .where Method to receive a SimpleCondition Inner class , Analyze user data and judge conditions , The qualified element returns true. Next, specify the number of elements that meet the condition , It's used here timesOrMore(n), It means n Time or n More than once . We noticed that there was another consecutive, signify timesOrMore(n) It has to be continuous , Other elements cannot be interspersed in the middle .

Let's take another user login , Example of withdrawal and withdrawal :

//  Define user login events respectively , Withdrawal events and logout events 
class UserEvent {
    
}

class LoginEvent extends UserEvent {
    
}

class WithdrawEvent extends UserEvent {
    
}

class LogoutEvent extends UserEvent {
    
}

//  Here is Pattern
Pattern
        .<UserEvent>begin("login")
        .subtype(LoginEvent.class)
        .next("withdraw")
        .subtype(WithdrawEvent.class)
        .timesOrMore(1)
        .consecutive()
        .next("logout")
        .subtype(LogoutEvent.class);

What is defined here Pattern by AB+C, There are three events named "login","withdraw" and "logout". Use subtype Specify the data type that meets the condition . That is, if it comes continuously 3 The elements are LoginEvent,WithdrawEvent and LogoutEvent Example , this 3 Elements will be captured .

The example has been finished, everyone is right Pattern You should have a preliminary understanding of the use of . Let's start the analysis one by one Pattern API More detailed configuration .

Pattern Combination configuration

The definition of combined configuration is mainly as follows 3 A way :

  • begin: Match the beginning of the template .
  • next: Strictly match ,A next B It means A It must be followed closely B.
  • followedBy: Non strict matching , Don't ask to follow closely , Other elements can be interspersed in the middle . for example A followedBy B Not only can it match A B, It can match A C B.C Failure to match will be ignored . Just ignore unmatched elements .
  • followedByAny: Non strict matching , Than followedBy More relaxed , You can even ignore elements that can be matched , for example A followedByAny B To match A C B1 B2 It can be matched to A B1(C Be ignored ) and A B2( Even though B1 eligible , But it can also be ignored , This is a followedByAny and followedBy The difference ). If it is A followedBy B Can only match A B1.
  • notNext:next The negative form of
  • notFollowedBy:followedBy The negative form of , Be careful notFollowedBy Cannot be used in Pattern Ending .

Specify the number of repetitions

The number of repetitions can be specified 0 Time ,1 Time ,n Time ,n Time to m Time ,n More than times, etc . You can also limit this n Whether other events can be interspersed between times .

The configuration method is :

  • times(n):n Time
  • times(n, m):n Time to m Time
  • optional(): Take this and allow it to appear 0 Time
  • oneOrMore():1 Times or times
  • timesOrMore(n):n Times or times
  • greedy(): It means to match as many times as possible . For example, the incoming element is A X X B. among X Both meet the conditions A And meet the conditions B. about Pattern A+ B, It can be matched to A X,A X X,X X B,X B and A X X B. But for A (greedy)+ B,Flink Will match as many as possible A Conditions , So it will only match A X X B.
  • consecutive(): requirement Pattern Must be continuous .
  • allowCombinations(): and consecutive contrary , No continuity is required .

Specified conditions

  • where(): The parameter is an internal class , Very similar to filter operator , You can write custom conditional expressions .
  • subtype(): Specify the type of matching element .

Skip policy after matching

We often encounter situations where an element can be successfully matched many times . in application , How can an element be matched , This behavior can be specified by skipping the policy after matching .

The skip strategy after matching is as follows 5 Kind of :

  • NO_SKIP: Don't skip , Match all possibilities .
  • SKIP_TO_NEXT: The next matching starts from the next event of the first event in the matching successful event sequence .
  • SKIP_PAST_LAST_EVENT: The next matching starts from the next event of the last event in the matching successful event sequence .
  • SKIP_TO_FIRST(patternName): From the sequence of events that match successfully, the first one corresponds to patternName The next match starts with the event of .
  • SKIP_TO_LAST(patternName): From the event sequence that matches successfully, the last one corresponds to patternName The next match starts with the event of .

The skip strategy after matching points out when defining the combination configuration :

Pattern.begin("patternName", skipStrategy);

skipStrategy Create... As follows :

AfterMatchSkipStrategy.noSkip();
AfterMatchSkipStrategy.skipToNext();
AfterMatchSkipStrategy.skipPastLastEvent();
AfterMatchSkipStrategy.skipToFirst(patternName);
AfterMatchSkipStrategy.skipToLast(patternName);

The time limit

In addition to data constraints ,Flink CEP It also supports specifying from the time dimension Pattern.

  • within( Period of time ): These series of elements that meet the conditions must occur within a specified time period .

establish PatternStream

adopt CEP.pattern Method , Register target objects DataStream And the above chapter we created Pattern.

PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//  Or we specify comparator, Not very often 
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);

The second method is passed in Comparator You can customize the method of element comparison , Used as an element Event Time Judge the order at the same time . For example, we write a custom Row Type comparison logic :

new EventComparator<Row>() {
    @Override
    public int compare(Row o1, Row o2) {
        //  Custom comparison logic is here 
        return 0;
    }
};

It should be noted that ,Flink 1.12 After the version CEP Of PatternStream By default Event Time. If business uses things Processing Time, The configuration must be clear .

PatternStream<Event> patternStream = CEP.pattern(input, pattern).inProcessingTime();

from PatternStream Select the captured element

select Method to receive a PatternSelectFunction Type parameter , Users need to implement this interface , Write your own processing logic . The interface is shown below :

public interface PatternSelectFunction<IN, OUT> extends Function, Serializable {
    OUT select(Map<String, List<IN>> var1) throws Exception;
}

The received parameter type is Map<String, List<IN>>, among map Of key by Pattern Specified in the combination method pattern name ,value A set of matched elements . Pass the processed data through return Return to .

You can also use process function :

CEP.pattern(...).process(new PatternProcessFunction<IN, OUT>() {
    @Override
    public void processMatch(Map<String, List<IN>> map, Context context, Collector<OUT> collector) throws Exception {
        // ...
    }
});

Here the parameter types and PatternSelectFunction similar , But the processed data cannot pass return return , Need to use collector collect .

about Pattern It can be matched to , But the timeout element ( Last chapter within Configured time period ), By default, it will be discarded . If we need to capture the matching result of this timeout , You can use custom PatternProcessFunction, Realization TimedOutPartialMatchHandler. As shown below :

class CustomPatternProcessFunction extends PatternProcessFunction<Object, Object> implements TimedOutPartialMatchHandler<Object> {
    @Override
    public void processMatch(Map<String, List<Object>> map, Context context, Collector<Object> collector) throws Exception {
        // ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<Object>> map, Context context) throws Exception {
        Object element = map.get("key").get(0);
        context.output(outputTag, element);
    }
}

The above example will time out element Put in bypass data , Bind to a outputTag On .OutputTag Elements used to mark a set of bypass outputs . Here is the creation OutputTag And the method of getting bypass data elements :

// Object Element type output for bypass 
OutputTag<Object> outputTag = new OutputTag<>("late-element");

// CEP operation ...

DataStream<Object> sideOutput = patternStream.getSideOutput(outputTag);

If you use event time Pattern , There must be elements of late . If we need to capture these elements , It can be the same as above , Use bypass output :

patternStream.sideOutputLateData(lateDataOutputTag)

The new edition changes

from Flink 1.12 Start ,CEP The default from the Processing Time Change it to Event Time. Be sure to pay attention to . For details, see Flink upgrade 1.12 Version of the pit .

Use SQL How to write CEP

Besides using Pattern API,Flink Also supports the use of SQL How to write CEP, Compared with SQL More flexible , But you need to learn SQL match_recognize Syntax of clause .SQL How to write CEP See Flink Use it CEP(SQL The way ).



link :http://events.jianshu.io/p/a3931e203324

原网站

版权声明
本文为[Empty one by one]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/187/202207060919524074.html