当前位置:网站首页>Flink-- custom function
Flink-- custom function
2022-07-03 10:48:00 【Samooyou】
This article introduces you how to calculate in real time Flink Version custom scalar function (UDF)、 Custom aggregate functions (UDAF)、 Custom table valued functions (UDTF) Write business code and go online .
Custom scalar functions (UDF)
Definition
Custom scalar functions (UDF) take 0 individual 、1 One or more scalar values are mapped to a new scalar value .
To define scalar functions , Must be in org.apache.flink.table.functions Extend base classes in Scalar Function, And implement ( One or more ) evaluation (evaluation,eval) Method . The behavior of scalar functions is determined by the evaluation method , The evaluation method must be publicly declared and named eval( direct def Statement , No, override). Parameter types and return types of evaluation methods , The parameters and return types of scalar functions are determined .
Business code
UDF Need to be in ScalarFunction In the class implementation eval
Method .open
Methods and close
The method is optional .
Be careful :UDF By default, there will be the same output for the same input . If UDF The same output cannot be guaranteed , for example , stay UDF Calling external service , The same input value may return different results , It is recommended that you use override isDeterministic()
Method , return False
. Otherwise, under certain conditions , The output is not as expected . for example ,UDF The operator moves forward .
With Java For example , The sample code is as follows .
|
Register to use
Registration details : Register to use UDF.
To write SQL sentence
Register at UDF After completion , You can use UDF, Must be database.udf( Database name .udf) Use your UDF function .
In custom functions SQL An example of the use statement of is as follows .
|
Custom aggregate functions (UDAF)
Definition
User defined aggregate function (User-Defined Aggregate Functions,UDAGGs) You can put the data in a table , Aggregate into a scalar value . User defined aggregate functions , By inheritance AggregateFunction Abstract class implementation of .
AggregateFunction It works as follows .
- First , It requires an accumulator , The data structure used to hold the intermediate results of aggregation ( state ). You can call AggregateFunction Of createAccumulator() Method to create an empty accumulator .
- And then , Call the function for each input line accumulate() Method to update the accumulator .
- After processing all the lines , Will call the getValue() Method to calculate and return the final result .
AggregationFunction Require methods that must be implemented :
- createAccumulator()
- accumulate()
- getValue()
In addition to the above methods , There are also some alternative implementation methods . Some of these methods , It can make the system execute queries more efficiently , And other methods , Required for some scenarios . for example , If the aggregate function is applied to the session window (session group window) In the context of , be merge() Methods are necessary .
- retract()
- merge()
- resetAccumulator()
AggregateFunction Core interface method , As shown below :
createAccumulator and getValue Method
|
explain
- createAccumulator and getValue Can be defined in AggregateFunction Abstract class .
- UDAF Must contain 1 individual accumulate Method .
accumulate Method
|
explain
- You need to implement a accumulate Method , To describe how to calculate the input data , And update the data to accumulator in .
- accumulate The first argument to the method must be to use AggregateFunction Of ACC Type of accumulator. During the operation of the system ,runtime The code will put accumulator The historical status of and the upstream data you specified ( Support any number of , Any type of data ) As a parameter , Pass it on to accumulate Method .
retract and merge Method
createAccumulator、getValue and accumulate 3 Use it together , You can design a basic UDAF. But real-time computing Flink Some special scenarios need to be provided by you retract and merge There are two ways to do it .
Usually , The calculation is an advance observation of the infinite flow (early firing). Since there are early firing, There will be modifications to the issued results , This operation is called withdrawal (retract).SQL The translation optimizer will help you automatically determine which cases will produce withdrawn data , Which operations need to process data with withdrawal marks . But you need to implement a retract Method to process the withdrawn data .
|
explain
- retract The method is accumulate Reverse operation of method . for example , Realization Count Functional UDAF, In the use of accumulate When the method is used , Every piece of data needs to be added 1; In the use of retract When the method is used , We have to reduce 1.
- Be similar to accumulate Method ,retract Methods the first 1 Two parameters must be used AggregateFunction Of ACC Type of accumulator. During the operation of the system ,runtime The code will put accumulator The historical state of , And the upstream data you specify ( Any number , Any type of data ) Send to retract Calculation .
In real time computing Flink Some scenes in the version need to use merge Method , for example session window. Due to real-time computing Flink Version with out of order Characteristics of , The data entered after may be located in 2 Two separate ones session middle , This way 2 individual session combine 1 individual session. here , Need to use merge Method to put multiple accumulator combine 1 individual accumulator.
|
explain
- merge Methods the first 1 Parameters , Must be used AggregateFunction Of ACC Type of accumulator, And the first 1 individual accumulator yes merge After method completion , Where the status is stored .
- merge Methods the first 2 The parameters are 1 individual ACC Type of accumulator Ergodic iterator , There may be something in it 1 One or more accumulator.
Write business logic code
With Java For example , The example code is as follows .
|
Register to use
Registration details : Register to use UDF.
To write SQL sentence
Register at UDF After completion , You can use UDF, Must be database.udf( Database name .udf) Use your UDF function .
In the custom aggregate function SQL An example of the use statement of is as follows .
|
Custom table valued functions (UDTF)
Definition
User defined table aggregation functions (User-Defined Table Aggregate Functions,UDTAGGs), You can put the data in a table , Aggregate into result tables with multiple rows and columns . This one AggregateFunction Very similar , Just before the aggregation result is a scalar value , Now it becomes a table . Similar to a custom scalar function , Custom table valued functions (UDTF) take 0 individual 、1 One or more scalar values as input parameters ( It can be a variable length parameter ). Unlike scalar functions , Table valued functions can return any number of rows as output , Not just 1 It's worth . The returned line can be returned by 1 Columns or columns .
User defined table aggregation functions , By inheritance TableAggregateFunction Abstract class .
TableAggregateFunction It works as follows .
- First , It also needs an accumulator (Accumulator), It's a data structure that holds the intermediate results of aggregation . By calling TableAggregateFunction Of createAccumulator() Method to create an empty accumulator .
- And then , Call the function for each input line accumulate() Method to update the accumulator .
- After processing all the lines , Will call the emitValue() Method to calculate and return the final result .
AggregationFunction Require methods that must be implemented :
- createAccumulator()
- accumulate()
In addition to the above methods , There are also some alternative implementation methods .
- retract()
- merge()
- resetAccumulator()
- emitValue()
- emitUpdateWithRetract()
Business code
UDTF Need to be in TableFunction In the class implementation eval Method .open Methods and close The method is optional . With Java For example , The sample code is as follows .
|
Multiline return
UDTF You can call collect()
The implementation will 1 The data of a row is converted to multiple rows and returned .
Multi column return
UDTF Not only can we do 1 Line to line , just so so 1 Column to multi column . If you need UDTF Return to multiple columns , Just declare the return value as Tuple or Row.Tuple or Row Explain the following :
- The return value is Tuple
Real time computing Flink Version supports the use of Tuple1 To Tuple25 , Definition 1 Fields to 25 A field . use Tuple3 Come back to 3 Of fields UDTF Examples are as follows .
|
explain Use Tuple when , Field value cannot be null, And can only exist at most 25 A field .
- The return value is Row
Use Row To return 3 Of fields UDTF Examples are as follows .
|
explain Row The field value of can be null, But if you need to use Row, Must overload implementation getResultType
Method .
SQL grammar
UDTF Support cross join and left join, In the use of UDTF You need to add lateral
and table
keyword .
- cross join
Each row of data in the left table will be associated with UDTF Each row of output data , If UDTF No data is produced , Then this 1 Rows will not be output .
|
- left join
Each row of data in the left table will be associated with UDTF Each row of output data , If UDTF No data is produced , Then this 1 Yes UDTF The field will be used null Value padding .
|
Register to use
Registration details : Register to use UDF.
To write SQL sentence
Register at UDF After completion , You can use UDF, Must be database.udf( Database name .udf) Use your UDF function .
In the custom aggregate function SQL An example of the use statement of is as follows .
|
边栏推荐
- Ut2013 learning notes
- Set ArrayList nested map set loop traversal
- Ut2014 learning notes
- Leetcode刷题---1
- Chiyou (), a specific mythical image, is also gradually abstracted as a dramatic character type "Jing". "Jing", born in Dan Dynasty and ugly at the end, is the earliest "profession" in Chinese drama
- Pour vous amener dans le monde des bases de données natives du cloud
- Buy health products for parents
- 小文件专项
- Flink -- 内置函数(ALL)
- Jupiter notebook changing font style and font size
猜你喜欢
Bid -- service commitment -- self summary
Ut2014 supplementary learning notes
Numpy realizes the classification of iris by perceptron
Pytoch has been installed, but vs code still displays no module named 'torch‘
[SQL] an article takes you to master the operations related to query and modification of SQL database
Unity小组工程实践项目《最强外卖员》策划案&纠错文档
Ind wks first week
Take you into the cloud native database industry, Amazon Aurora
深度学习入门之线性代数(PyTorch)
Classification (data consolidation and grouping aggregation)
随机推荐
Flink <-->Redis的使用介绍+with参数
Numpy quick start (IV) -- random sampling and general functions
Leetcode skimming ---263
[SQL] an article takes you to master the operations related to query and modification of SQL database
Leetcode skimming ---852
QT:QSS自定义QToolBar和QToolBox实例
Entropy method to calculate weight
[combinatorial mathematics] pigeon nest principle (simple form examples of pigeon nest Principle 4 and 5)
Flink -- 内置函数(ALL)
Multilayer perceptron (pytorch)
Jetson TX2 刷机
logstash备份跟踪上报的数据记录
Softmax 回归(PyTorch)
Introduction to deep learning linear algebra (pytorch)
conda9.0+py2.7+tensorflow1.8.0
Unity学习笔记:联网游戏Pixel Adventure 1学习过程&纠错心得
八、MySQL之事务控制语言
Weight decay (pytorch)
QT:QSS自定义QTableView实例
Unity小组工程实践项目《最强外卖员》策划案&纠错文档