当前位置:网站首页>Flink multi stream conversion (side output stream shunting, union, connect) real-time reconciliation of APP payment operations and third-party payment operations

Flink multi stream conversion (side output stream shunting, union, connect) real-time reconciliation of APP payment operations and third-party payment operations

2022-06-11 12:11:00 But don't ask about your future

Preface


1. shunt

   Split a data stream into two completely independent streams 、 Even multiple streams , Direct use handler (process function) Side output stream (side output) that will do
 Insert picture description here

public class SplitStreamByOutputTag {
    
    private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv") {
    };
    private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv") {
    };

    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
    
            @Override
            public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
    
                if (value.user.equals("Mary")) {
    
                    ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else if (value.user.equals("Bob")) {
    
                    ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else {
    
                    out.collect(value);
                }
            }
        });

        processedStream.getSideOutput(MaryTag).print("Mary pv");
        processedStream.getSideOutput(BobTag).print("Bob pv");
        processedStream.print("else");

        env.execute();
    }
}

2. Confluence

2.1 union (Union)

   Joint operation requires that The data types in the stream must be the same , The merged new flow will include all the elements in the flow , The data type remains unchanged
 Insert picture description here

stream1.union(stream2, stream3, ...)

Be careful :
   For the water level after confluence , With the smallest watermark Subject to , To ensure that all streams will not send data before the water level

public class UnionExample {
    
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("192.168.0.23", 7777)
                .map(data -> {
    
                    String[] field = data.split(",");
                    return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
    
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
    
                                        return element.timestamp;
                                    }
                                })
                );
        stream1.print("stream1");
        SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("192.168.0.23", 7778)
                .map(data -> {
    
                    String[] field = data.split(",");
                    return new Event(field[0].trim(), field[1].trim(),
                            Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
    
                            @Override
                            public long extractTimestamp(Event element, long
                                    recordTimestamp) {
    
                                return element.timestamp;
                            }
                        })
                );
        stream2.print("stream2");
        //  Merge two streams 
        stream1.union(stream2)
                .process(new ProcessFunction<Event, String>() {
    
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
    
                        out.collect("  water   position   Line  : " + ctx.timerService().currentWatermark());
                    }
                })
                .print();
        env.execute();
    }
}

   After the confluence ProcessFunction In the corresponding operator task , The initial state of the logical clock :
 Insert picture description here
   because Flink At the beginning of the stream , Insert a negative infinity (Long.MIN_VALUE) The water level line of , So after the confluence ProcessFunction Corresponding processing tasks , Meeting Save a zone watermark for each merged flow , The initial value is zero Long.MIN_VALUE; At this time, the water level of the operator task is the minimum value of the water level of all partitions , So it's also Long.MIN_VALUE


2.2 Connect (Connect)

2.2.1 Connection flow (ConnectedStreams)

  DataStream Data in can only have a unique type , So the connection is not DataStream, It's a connection flow (ConnectedStreams) The connection flow can be regarded as the unity of the two flows , Put in the same stream ; In fact, the internal data form remains unchanged , They are independent of each other . It is necessary to further define a same process (co-process) Conversion operation . Two streams can maintain their own data types 、 The treatment can also be different , But it will eventually be unified into the same DataStream in
 Insert picture description here

public class CoMapExample {
    
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Integer> stream1 = env.fromElements(1, 2, 3);
        DataStream<Long> stream2 = env.fromElements(1L, 2L, 3L);

        ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
        SingleOutputStreamOperator<String> map = connectedStreams.flatMap(new CoFlatMapFunction<Integer, Long, String>() {
    
            @Override
            public void flatMap1(Integer value, Collector<String> out) throws Exception {
    
                out.collect("stream1:" + value);
            }

            @Override
            public void flatMap2(Long value, Collector<String> out) throws Exception {
    
                out.collect("stream2:" + value);
            }
        });

        map.print();
        env.execute();
    }
}

  ConnectedStreams Directly callable .keyBy() Operate the key partition , Pass in two parameters keySelector1 and keySelector2, Are the key selectors in each of the two streams , Put the two streams key Put the same data together ( Be careful The types of keys defined by two streams must be the same , Otherwise, an exception will be thrown

connectedStreams.keyBy(keySelector1, keySelector2);

2.2.2 CoProcessFunction

 Insert picture description here
   Use CoProcessFunction Method , What needs to be achieved is processElement1()、processElement2() Two methods , When every data comes , One of the methods will be called according to the source stream for processing .CoProcessFunction You can use context ctx To visit timestamp、 Waterline , And pass TimerService Register timer ; In addition, it provides .onTimer() Method , It is used to define the processing operation of timing trigger

// Real-time reconciliation ,app  A dual stream of payment operations and third-party payment operations  Join.App  The payment event of and the payment event of the third party will wait for each other  5  Second , If you can't wait for the corresponding payment event , Then output the alarm information 
public class BillCheckExample {
    
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //  come from  app  Payment log 
        SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env
                .fromElements(Tuple3.of("order-1", "app", 1000L), Tuple3.of("order-2", "app", 2000L),Tuple3.of("order-3", "app", 9000L))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
    
                            @Override
                            public long extractTimestamp(Tuple3<String, String, Long>
                                                                 element, long recordTimestamp) {
    
                                return element.f2;
                            }
                        })
                );
        //  Payment logs from third-party payment platforms 
        SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env
                .fromElements(Tuple4.of("order-1", "third-party", "success", 3000L), Tuple4.of("order-3", "third-party", "success", 4000L), Tuple4.of("order-4", "third-party", "success", 8000L))
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
    
                            @Override
                            public long extractTimestamp(Tuple4<String, String, String, Long>
                                                                 element, long recordTimestamp) {
    
                                return element.f3;
                            }
                        })
                );
        //  Check whether the same payment document matches in the two streams , If it doesn't match, call the police 
        appStream.connect(thirdpartStream)
                .keyBy(data -> data.f0, data -> data.f0)
                .process(new OrderMatchResult())
                .print();
        env.execute();
    }

    //  Custom implementation  CoProcessFunction
    public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String> {
    
        //  Define state variables , Used to save events that have arrived 
        private ValueState<Tuple3<String, String, Long>> appEventState;
        private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;

        @Override
        public void open(Configuration parameters) throws Exception {
    
            appEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));
            thirdPartyEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG)));
        }

        @Override
        public void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
    
            if (thirdPartyEventState.value() != null) {
    
                out.collect("  Yes   zhang   become   work  : " + value + " " + thirdPartyEventState.value());
                //  Empty state 
                thirdPartyEventState.clear();
            } else {
    
                appEventState.update(value);
                ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
            }
        }

        @Override
        public void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
    
            if (appEventState.value() != null) {
    
                out.collect(" Reconciliation successful :" + appEventState.value() + " " + value);
                //  Empty state 
                appEventState.clear();
            } else {
    
                //  Update status 
                thirdPartyEventState.update(value);
                //  Sign up for a  5  The timer in seconds , Start waiting for another stream of events 
                ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
            }
        }
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
    
            //  Timer triggered , Judge the state , If a state is not empty , It means that the event in another stream did not come 
            if (appEventState.value() != null) {
    
                out.collect(" Reconciliation failed :" + appEventState.value() + " " + " The information of the third-party payment platform has not arrived ");
            }
            if (thirdPartyEventState.value() != null) {
    
                out.collect(" Reconciliation failed :" + thirdPartyEventState.value() + " " + "app The information did not arrive ");
            }
            appEventState.clear();
            thirdPartyEventState.clear();
        }
    }
}

2.2.3 Broadcast connection flow (BroadcastConnectedStream)

  DataStream call .connect() When the method is used , The parameter passed in may not be a DataStream, It is a Broadcast stream (BroadcastStream), At this point, the result of merging the two streams becomes a broadcast connection (BroadcastConnectedStream)
   This connection method is often used in scenarios where some rules or configurations need to be dynamically defined . Because the rules change in real time , So you can use a separate stream to get the rule data ; These rules or configurations are globally valid for the entire application , So you can't just pass this data to a downstream parallel subtask , But to broadcast (broadcast) Give all parallel subtasks . And the downstream task receives the broadcast rules , Will save it in a state , namely Broadcast status (broadcast state)
   The bottom layer of broadcast state is a mapping (map) Structure to preserve . In code implementation , Can be called directly DataStream Of .broadcast() Method , Pass in a Mapping state descriptor (MapStateDescriptor) Describes the name and type of the status , You can get the broadcast stream of rule data (BroadcastStream)

MapStateDescriptor<String, Rule> ruleStateDescriptor = new 
MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
 .broadcast(ruleStateDescriptor);
DataStream<String> output = stream
 .connect(ruleBroadcastStream)
 .process( new BroadcastProcessFunction<>() {
    ...} );

  BroadcastProcessFunction And CoProcessFunction similar , It is also an abstract class , Two methods need to be implemented , Define processing operations for the elements in the merged two streams . The difference is that a stream here normally processes data , The other stream is to update the broadcast status with new rules , So the corresponding two methods are called .processElement() and .processBroadcastElement()
 Insert picture description here


2.2 Time based confluence —— Double flow connection (Join)

Double flow connection

原网站

版权声明
本文为[But don't ask about your future]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/162/202206111206047846.html