当前位置:网站首页>Window and various windowfunctions in Flink
Window and various windowfunctions in Flink
2022-06-13 03:35:00 【TRX1024】
streaming Stream computing is a data processing engine designed to handle infinite data sets , And infinite data set is a kind of growing essentially infinite data set , and window It is a method of cutting infinite data into finite blocks .
Window Is the core of infinite data flow processing ,Window Will be an infinite stream Split into finite sizes ”buckets” bucket , We can do calculations on these buckets .
One 、Window It can be divided into two categories :
- CountWindow: According to the specified number of data to generate a Window, It has nothing to do with time .
- TimeWindow: Generate according to time Window.
about TimeWindow, Can be divided into :
- Scroll the window (Tumling Window)
- Segment the data according to the fixed window length
- characteristic : Time alignment , Window length fixed , There's no overlap
- Applicable scenario :BI Statistics, etc ( Do aggregate calculation for each time period )
- The sliding window (Sliding Window)
- characteristic : Window length fixed , There can be overlap
- When the sliding length of the window = Window length , It's like scrolling through windows , in other words , A scrolling window is a special sliding window
- Applicable scenario : Statistics for the last period of time ( Find the nearest interface 5min To determine whether to call the police )
- Session window (Session Window)
Two 、 Window splitter (Window Assingers):
After specifying whether the data flow is grouped or non grouped , You need to define a window allocator to connect (window assinger), The window allocator defines how elements are allocated to windows , Used in packet data flow .window() To define a window , The parameter that the method receives is a window allocator , The window allocator is responsible for distributing each input data to the correct window .
Flink Provides a general window allocator :
- Scroll window allocator
- Sliding window allocator
- Session window allocator
- Global window allocator
concrete class :

Red boxes are common window allocators
3、 ... and 、 Window function (Window Functions)( It's very important )
After defining the window allocator , We also need to perform the calculations we need to perform for each window , This is the responsibility of the window ,
window The function can be ReduceFunction, FoldFunction perhaps WindowFunction One of them .
The first two are more efficient , Because the aggregation operation is incrementally performed on each arriving element in each window .
One WindowFunction You can get an iteration of all elements in a window and additional meta information about which element belongs to which window .
therefore , Classification of window functions :
- Incremental aggregate function
- Optional functions
- ReduceFunction
- Usage method :.reduce(ReduceFunction,[...])
- FoldFunction
- Usage method :.fold(FoldFunction,[...])
- AggregateFunction
- Usage method :.aggregate(AggregateFunction,[...])
- Inherited from ReduceFunction
- Has been marked as deleted , It may not work in the future
- Usage method :.fold(FoldFunction,[...])
- ReduceFunction
- characteristic
- High computing performance , It takes less storage space
- Each piece of data comes and is calculated , Keep it simple , No need to cache raw data .
- Popular said , A piece of data is calculated once , Save the results in a state , Output the result of the last calculation when the window is closed
- Optional functions
- Full window functions
- Optional functions
- WindowFunction
- Usage method :.apply(WindowFunction)
- ProcessWindowFunction
- Use WindowFunction Can be used everywhere ProcessWindowFunction
- Function and WindowFunction similar , More information about context Information about
- context yes window Some contextual information about where it happened
- Usage method :.process(ProcessWindowFunction)
- WindowFunction
- characteristic
- Cache all the data in the window first , Wait until the window triggers , Summarize all data
- If the data volume is large or the window time is long , It is more likely to lead to the decline of computer performance .
- My own thinking : Where does the data of the window exist ? Will the OOM?
- Optional functions
WindowFunction The windowing operation of is less efficient than other operations , because Flink Internally, all elements in the window will be cached before calling the function .
When we need to use WindowFunction Additional metadata information in the window , What should I do if I want to improve some performance ?
Can pass WindowFunction and ReduceFunction perhaps WindowFunction and FoldFunction Use a combination of , To get the incremental aggregation and... Of all elements in the window WindowFunction Additional metadata received .
Concrete realization :
Use .reduce(ReduceFunction,WindowFunction),
- The first parameter passes a ReduceFunction, When the window opens , One data call , Used for incremental aggregation of data , The aggregated data is saved in a state , Before the window closes , Pass the status data to the function corresponding to the second parameter
- The second parameter passes a WindowFunction, Called when the window is closed , here ,WindowFunction Corresponding " all " The data is Parameter 1 is the result of incremental aggregation of the corresponding function , Generally, there is only one record . here , stay WindowFunction Get the result of incremental function calculation from , Then fill in the metadata information corresponding to the window at that time , You can output the result , The result is the final result of this window calculation .
Next, let's look at each Function The presentation of :
1.ReduceFunction
ReduceFunction Specifies how to merge through two input parameters , The process of outputting a parameter of the same type , Incremental aggregation .
Such as :
windowedDS.reduce(
new ReduceFunction[ProductStatsV2]() {
override def reduce(stats1: ProductStatsV2, stats2: ProductStatsV2): ProductStatsV2 = {
stats1.setDisplay_ct(stats1.getDisplay_ct() + stats2.getDisplay_ct())
// Other aggregation operations
stats1
}
})2.FoldFunction
FoldFunction Function not commonly used , No demonstration
3. Window function (WindowFunction) —— General usage
One WindowFunction You will get a containing window Iteration of all elements in (Iterable), And provides maximum flexibility for all window functions . These bring about the cost of performance and the consumption of resources .
because window Elements in cannot be incrementally iterated , Instead, cache it until window When the window is closed ( When considered treatable ) until .
WindowFunction The instructions are as follows :
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
/**
// Evaluates the window and outputs none or several elements.
// @param key Of this window key
// @param window Window information ( Window start time , End time and other meta information )
// @param input Elements in the window ( All the data )
// @param out Element output collector
// @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}Examples of use :
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
/* ... */
class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
var count = 0L
// Window calculation logic
for (in <- input) {
count = count + 1
}
// Get window meta information
window.getStart
window.getend
out.collect(s"Window $window count: $count")
}
}The above example shows the statistics of a window Of the number of elements in WindowFunction, Besides , Will also window Information of is added to the output .
Be careful : Use WindowFunction To do simple aggregation operations such as counting , The performance is quite poor .
Next we will show how to ReduceFunction Follow WindowFunction Combine , To get incremental aggregations and add to WindowFunction Information in .
4.ProcessWindowFunction
In the use of WindowFunction You can also use ProcessWindowFunction, This one WindowFunction Is very similar , It also allows you to query more about context Information about ,context yes window Assess where it happened .
Here is ProcessWindowFunction The interface of :
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
// ..
*/
@throws[Exception]
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
// The context holding window metadata
*/
abstract class Context {
/**
// @return The window that is being evaluated.
*/
def window: W
}
}You can call... In this way :
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction())5. With incremental aggregation function WindowFunction(ReduceFunction And WindowFunction combination )
WindowFunction Can follow ReduceFunction perhaps FoldFunction Combine to arrive incrementally window The elements in ,
When window After the closing ,WindowFunction Can provide aggregate results . When you get WindowFunction additional window After meta information, you can perform incremental calculation window .
mark : You can also use ProcessWindowFunction Replace WindowFunction To perform incremental window aggregation .
Use ReduceFunction And WindowFunction Combined with incremental window aggregation (FoldFunction Methods do not demonstrate ) An example of :
// You need to extract the windowing information Use reduce operator , Pass two parameters :reducefunction windowfunction
// A piece of data , Use reducefunction Aggregate once , Wait until the window is closed and ready to output data , take reducefunction The calculated data is transmitted to windowfunction To process
// here windowfunction There's only one piece of data , This number is called reducefunction Processed data
val reduceDS = windowedDS.reduce(
// Incremental aggregation , One piece of data is aggregated once , Give it to... Before the window is ready to close windowFunction
new ReduceFunction[ProductStatsV2]() {
override def reduce(stats1: ProductStatsV2, stats2: ProductStatsV2): ProductStatsV2 = {
stats1.setDisplay_ct(stats1.getDisplay_ct() + stats2.getDisplay_ct())
// Other incremental aggregation logic
stats1
}
},
// Before the window is ready to close ,reduceFunction Transfer the processed data ,windowFunction Fill in the window information
new WindowFunction[ProductStatsV2, ProductStatsV2, Long, TimeWindow]() {
override def apply(key: Long, window: TimeWindow, input: Iterable[ProductStatsV2], out: Collector[ProductStatsV2]): Unit = {
// Take out the data
val productStats = input.iterator.next() // The result of incremental aggregation , There is only one piece of data at this time
// Set window time ( from window In order to get )
productStats.setStt(DateTimeUtil.toYMDhms(new Date(window.getStart)))
productStats.setEdt(DateTimeUtil.toYMDhms(new Date(window.getEnd)))
// Set order quantity
productStats.setOrder_ct(productStats.getOrderIdSet().size())
productStats.setPaid_order_ct(productStats.getPaidOrderIdSet().size())
productStats.setRefund_order_ct(productStats.getRefundOrderIdSet().size())
// Write out the data
out.collect(productStats)
}
})other AggregateFunction、FoldFunction And WindowFunction/ProcessWindowFunction The method of implementing incremental aggregation in combination is similar to
Four 、 stay .window() Methods the back , You can also define other optional API
- .trigger() —— trigger
- Definition window When to close , Trigger the calculation and output the result
- .evitor() —— Remover
- Define the logic for removing some data
- Be similar to filter, Remove unwanted data from the window
- .allowedLateness() —— Allow processing of late data
- Distributed , There may be data disorder , It is possible that the event data that originally occurred first will arrive later , Wait until the window closes , What should I do before this data is available ?
- Can define allowedLateness() A time is passed in ( If 1 s), Indicates that late processing is allowed 1s The data of
- .sideOutputLateData() —— Put the late data into the side output stream
- This function is generally used in conjunction with the above
- allowedLateness() Allow processing of late data , So how long should it be set , If the setting time is too long , The pressure on memory will be relatively large , It's impossible to wait like this all the time , Therefore, a small value is usually set .
- So here comes the question , What if there are still data beyond this time ? Put the late data into the side output stream .( What is a side output stream ?)
- How to obtain the data of the side output stream ? The following functions are implemented .
- .getSideOutput() —— Get side output stream
- After the above function processing , You'll get one DataStream, call .getSideOutput() The data of the side output stream can be obtained .
边栏推荐
- Azure SQL db/dw series (13) -- using query store (2) -- report Introduction (2)
- Application framework / capability blueprint
- China Civil Aviation Statistical Yearbook (1996-2020)
- Doris outputs numbers in currency format. The integer part is separated by commas every three digits, and the decimal part is reserved for two digits
- English grammar_ Frequency adverb
- Four ways of array traversal in PHP
- Doris' table creation and data division
- MMAP usage in golang
- Data Governance Series 1: data governance framework [interpretation and analysis]
- Summary of the latest rail transit (Subway + bus) stops and routes in key cities in China (II)
猜你喜欢

2000-2019 enterprise registration data of provinces, cities and counties in China (including longitude and latitude, number of registrations and other multi indicator information)

The latest collation of the number of years of education per capita in the country and provinces -1989-2020- includes the annual original data, calculation process and result summary

Azure SQL db/dw series (12) -- using query store (1) -- report Introduction (1)

Cross border M & a database: SDC cross border database, Thomson database, A-share listed company M & a database and other multi index data (4w+)

Complex network analysis capability based on graph database
![[JVM Series 7] garbage collector](/img/e5/902de6398359b75183aa1fafcaa7d1.jpg)
[JVM Series 7] garbage collector

MySQL learning summary 6: data type, integer, floating point number, fixed-point number, text string, binary string

English grammar_ Mode adverb position
![[JVM Series 5] performance testing tool](/img/94/b9a93fc21caacaf2a2e6421574de5c.jpg)
[JVM Series 5] performance testing tool

2-year experience summary to tell you how to do a good job in project management
随机推荐
[azure data platform] ETL tool (5) -- use azure data factory data stream to convert data
MASA Auth - SSO与Identity设计
Isolation level, unreal read, gap lock, next key lock
Patrick Pichette, partner of inovia, former chief financial officer of Google and current chairman of twitter, joined the board of directors of neo4j
The latest summary of key topics of journal C in 2022 - topic scope, contribution method and journal introduction
CXGRID keeps the original display position after refreshing the data
Solution of Kitti data set unable to download
Data of all bank outlets in 356 cities nationwide (as of February 13, 2022)
Configuration and practice of shardingsphere JDBC sub database separation of read and write
Solve the error in CONDA installation PyMOL
MySQL group commit
Azure SQL db/dw series (13) -- using query store (2) -- report Introduction (2)
Parallel one degree relation query
brew工具-“fatal: Could not resolve HEAD to a revision”错误解决
C method parameter: out
Doris outputs numbers in currency format. The integer part is separated by commas every three digits, and the decimal part is reserved for two digits
MASA Auth - SSO與Identity設計
2021-08-30 distributed cluster
C language programming - input a string arbitrarily from the keyboard, calculate the actual number of characters and print out. It is required that the string processing function strlen() cannot be us
[JVM series 8] overview of JVM knowledge points