Flink Calculation results of late data update window , The late data after the window is destroyed is output to the test output stream
The main program :
//TODO Update the calculation result of the window with late data
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop106:9092");
SingleOutputStreamOperator<String> result = env.addSource(new FlinkKafkaConsumer<String>
("Tuple2", new SimpleStringSchema(), properties))
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
return Tuple2.of(value.split(" ")[0], Long.parseLong(value.split(" ")[1]) * 1000L);
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
})).keyBy(r -> r.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5))// The allowable late time is 5 second Window closing triggers calculation But the window is not destroyed 5 Seconds Late data if
// If it comes Update the calculation results of the window The difference between the allowable late time and the maximum delay time is Set the allowed late time to see the aggregation results of the window in advance It's not accurate
// However, the calculation results of the window will be updated when the later late data arrives If the time exceeds the allowable late time Then the late data can be sent to the side output stream
.sideOutputLateData(new OutputTag<Tuple2<String, Long>>("lateStream") {
})
.process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
// out.collect("key by :"+s+" The time range of the window is :"+new Timestamp(context.window().getStart())+
// "-"+new Timestamp(context.window().getEnd())+" The number of elements is :"+elements.spliterator().
// getExactSizeIfKnown());
// When the window closes , Trigger the first calculation of the window
ValueState<Boolean> firstCalculate = context.windowState().getState(new
ValueStateDescriptor<Boolean>("firstCalculate", Types
.BOOLEAN));
if (firstCalculate.value() == null) {
out.collect(" Window first trigger calculation , The number of elements in the window is :" + elements.spliterator().
getExactSizeIfKnown() + " The time range of the window is :" + new Timestamp(context.window().getStart()) + "-" +
new Timestamp(context.window().getEnd()));
firstCalculate.update(true);
} else {
out.collect(" The late element triggers the calculation , The number of updated elements is :" + elements.spliterator()
.getExactSizeIfKnown());
}
}
});
result.printToErr(" Mainstream output ");
result.getSideOutput(new OutputTag<Tuple2<String, Long>>("lateStream") {
}).print(" Measuring output current ");
env.execute();
}
Program analysis :
adopt kafka Producers go to Tuple2 topic Medium production data , When the input a 1 When , At this point, a 0-5s The window of the clock , When the input a 5 When ,0-5s The window of triggers the calculation for the first time , The window closes , Trigger window function process Implementation , Because it's a window
The first calculation of , Set the state variable to true, The subsequent late data arrives within the allowed waiting time, which will trigger the... Of the window 2/3/4 Second calculation and so on , Update calculation results , When the input a 10 When , At this time, the water level is 9999, Arrived at the 0-5s Time of window destruction ,
0-5s At the end of the window + Allow waiting for late time, that is 10s Destroy when necessary . Later, if 0-5s The data of the window comes , adopt sideOutPutLateData The window function will be sent to the side output stream .
FLink More related articles on the processing of late data
- 【 The source code parsing 】Flink How to deal with late data
I believe you will see this article Flink The type of time ( Event time . The processing time . Intake time ) and Watermark Some understanding , Of course, I don't understand. You can read the introduction on the official website first :https://ci.apache.org/projects/ ...
- flink----- Real time projects ---day06-------1. Get the data that the window is late 2. Double current join(inner join and left join( A little bit of a problem )) 3 Order Join Case study ( The order data is accessed to kafka, Of order data join Realization , Order data and late data join The implementation of the )
1. Get the data that the window is late The main process is to label the late data , Then use the instance of the corresponding window flow to call sideOutputLateData(lateDataTag), So as to obtain the data that the window is late , Then carry out relevant calculations , Specifically ...
- 《 from 0 To 1 Study Flink》—— Flink Write data to Kafka
Preface Previous articles < from 0 To 1 Study Flink>-- Flink Write data to ElasticSearch Write how to Kafka The data in is stored in ElasticSearch in , In fact, it has been used ...
- 《 from 0 To 1 Study Flink》—— Flink Write data to ElasticSearch
Preface front FLink In our article, we have introduced that Flink There are already a lot of their own Connector. 1.< from 0 To 1 Study Flink>-- Data Source Introduce 2.< from 0 To 1 ...
- Corporate practice | How to better use Apache Flink Solve the problem of data calculation ?
Exponential expansion of business data , The speed of data processing can not keep up with the pace of business development . be based on Flink Data platform construction . Application Flink Solve specific problems in business scenarios Flink More widely used in advertising . Financial risk control . real time B ...
- How to use Flink The data sink To kafka Multiple ( hundreds )topic in
Requirements and scenarios There is a huge amount of data in the upstream business , Enter into kafka One topic in ( Of course, this topic Of partition There must be more , Some people definitely wonder why such a huge amount of data has to be written into 1 individual topic in , Problems left by history , present ...
- flink series -10、flink Ensure data consistency
This article is extracted from books <Flink Basic course > One . Three levels of consistency When introducing state into a distributed system , Nature also introduces the problem of consistency . Consistency is actually “ Accuracy level ” Another way of saying , That is, the result obtained after successfully handling the fault and recovering ...
- How to use Flink The data sink To kafka Multiple differences ( hundreds )topic in
Requirements and scenarios There is a huge amount of data in the upstream business , Enter into kafka One topic in ( Of course, this topic Of partition There must be more , Some people definitely wonder why such a huge amount of data has to be written into 1 individual topic in , Problems left by history , present ...
- [ original . Data visualization series 3 ] Use Ol3 Load a lot of point data
Whether it's Baidu map or Gaode map , It is rare to see a large number of point features loaded on the map , For example, the same screen 1000 Of , Because the client performance will be very low , In especial IE Series of browsers , It's stuck to death . But sometimes , I really need , such as , I want to load global AQI ...
- [ Android Three of the five ways of data storage ] —— SQLite Store the data
SQLite It's a lightweight embedded database engine , It supports SQL Language , And it has good performance with very little memory . Besides, it's open source , Anyone can use it . Many open source projects ((Mozilla, PHP, Python) It's all used ...
Random recommendation
- madplay Playback controls
management madplay The main program of , Including playing , Pause play , Resume playback , Stop playing system("madplay north.mp3 &");// utilize system Function call madplay Play ...
- mongoDB Database and Spring MVC Integration of
The project that has been used before is Spring MVC+maven+mysql Of , Some data need to be used recently mongoDB database , Now let's make a few summaries . First step : load jar.maven To configure <!-- mongodb Start ...
- android In the project gen Directories cannot be generated automatically R.java Why
1. The called resource file does not exist :xml Some controls in the file have no associated references : Add the missing files of the project , Include resource files , Such as values Medium strings.xml Or resources such as pictures . 2. The necessary system files are missing in the project ( such as :defa ...
- ACM And Java Fast track (1)
I mean java Fast track , Limited to java grammar , Including input and output , Operation processing , String and high precision processing , The conversion between decimal system, etc , Can solve OJ Some of the high-precision topics on . 1. Input : The format is :Scanner cin = new Sc ...
- About Oracle、SqlServer Of sql recursive query
Recursively query all child nodes Construction personnel table hrmresource Primary key full name The superior ID Hierarchy :- 4 - 3 - 2 - 1 ...
- NSQ On rough reading
review : It used to be C++ Development ( client ), Recently, I heard from my colleagues go Good language , Then I decided to start from go Grammar begins to turn to go The arms of . Study for historical reasons go Grammatical time , It took half a day to read the relevant materials on the rookie tutorial , Later, I read another one in the Yi Bai tutorial ...
- Django And route URL, View , Templates ,ORM operation
1. The left side menu of background management , By default, only the options under the first tab are displayed , Click another tab to display the options under another tab , The problem is : After clicking on any menu option , The menu on the left becomes the option display of the first tab , Other tabs are hidden , The menu on the left is refreshed ...
- JWT authentication as well as Django Application in
jwt authentication Private key . Public key .CA authentication With a set of encryption rules Encryption and decryption RSA encryption ( Asymmetric encryption ) Abstract algorithm :MD5 FTP/ Internet download software verification MD5 Private key --RSA Algorithm --> Public key RSA principle encryption ...
- SQL SERVER Handling of isolated accounts
Step1: Inquire about Use KSHR_F23 Go exec sp_change_users_login @Action='Report' Go Step2: Handle Use KSHR_F23 Go exe ...
- 20172302 《Java Software structure and data structure 》 Experiment two : Tree experiment report
Course :<Java Software structure and data structure > class : 1723 full name : Hou Zeyang Student number :20172302 Experimental teacher : Mr. Wang Zhiqiang Date of experiment :2018 year 11 month 5 Japan Compulsory / Elective : Compulsory Experimental content (1) ginseng ...