当前位置:网站首页>Flink-- custom function
Flink-- custom function
2022-07-03 10:48:00 【Samooyou】
This article introduces you how to calculate in real time Flink Version custom scalar function (UDF)、 Custom aggregate functions (UDAF)、 Custom table valued functions (UDTF) Write business code and go online .
Custom scalar functions (UDF)
Definition
Custom scalar functions (UDF) take 0 individual 、1 One or more scalar values are mapped to a new scalar value .
To define scalar functions , Must be in org.apache.flink.table.functions Extend base classes in Scalar Function, And implement ( One or more ) evaluation (evaluation,eval) Method . The behavior of scalar functions is determined by the evaluation method , The evaluation method must be publicly declared and named eval( direct def Statement , No, override). Parameter types and return types of evaluation methods , The parameters and return types of scalar functions are determined .
Business code
UDF Need to be in ScalarFunction In the class implementation eval Method .open Methods and close The method is optional .
Be careful :UDF By default, there will be the same output for the same input . If UDF The same output cannot be guaranteed , for example , stay UDF Calling external service , The same input value may return different results , It is recommended that you use override isDeterministic() Method , return False. Otherwise, under certain conditions , The output is not as expected . for example ,UDF The operator moves forward .
With Java For example , The sample code is as follows .
|
Register to use
Registration details : Register to use UDF.
To write SQL sentence
Register at UDF After completion , You can use UDF, Must be database.udf( Database name .udf) Use your UDF function .
In custom functions SQL An example of the use statement of is as follows .
|
Custom aggregate functions (UDAF)
Definition
User defined aggregate function (User-Defined Aggregate Functions,UDAGGs) You can put the data in a table , Aggregate into a scalar value . User defined aggregate functions , By inheritance AggregateFunction Abstract class implementation of .
AggregateFunction It works as follows .
- First , It requires an accumulator , The data structure used to hold the intermediate results of aggregation ( state ). You can call AggregateFunction Of createAccumulator() Method to create an empty accumulator .
- And then , Call the function for each input line accumulate() Method to update the accumulator .
- After processing all the lines , Will call the getValue() Method to calculate and return the final result .
AggregationFunction Require methods that must be implemented :
- createAccumulator()
- accumulate()
- getValue()
In addition to the above methods , There are also some alternative implementation methods . Some of these methods , It can make the system execute queries more efficiently , And other methods , Required for some scenarios . for example , If the aggregate function is applied to the session window (session group window) In the context of , be merge() Methods are necessary .
- retract()
- merge()
- resetAccumulator()
AggregateFunction Core interface method , As shown below :
createAccumulator and getValue Method
|
explain
- createAccumulator and getValue Can be defined in AggregateFunction Abstract class .
- UDAF Must contain 1 individual accumulate Method .
accumulate Method
|
explain
- You need to implement a accumulate Method , To describe how to calculate the input data , And update the data to accumulator in .
- accumulate The first argument to the method must be to use AggregateFunction Of ACC Type of accumulator. During the operation of the system ,runtime The code will put accumulator The historical status of and the upstream data you specified ( Support any number of , Any type of data ) As a parameter , Pass it on to accumulate Method .
retract and merge Method
createAccumulator、getValue and accumulate 3 Use it together , You can design a basic UDAF. But real-time computing Flink Some special scenarios need to be provided by you retract and merge There are two ways to do it .
Usually , The calculation is an advance observation of the infinite flow (early firing). Since there are early firing, There will be modifications to the issued results , This operation is called withdrawal (retract).SQL The translation optimizer will help you automatically determine which cases will produce withdrawn data , Which operations need to process data with withdrawal marks . But you need to implement a retract Method to process the withdrawn data .
|
explain
- retract The method is accumulate Reverse operation of method . for example , Realization Count Functional UDAF, In the use of accumulate When the method is used , Every piece of data needs to be added 1; In the use of retract When the method is used , We have to reduce 1.
- Be similar to accumulate Method ,retract Methods the first 1 Two parameters must be used AggregateFunction Of ACC Type of accumulator. During the operation of the system ,runtime The code will put accumulator The historical state of , And the upstream data you specify ( Any number , Any type of data ) Send to retract Calculation .
In real time computing Flink Some scenes in the version need to use merge Method , for example session window. Due to real-time computing Flink Version with out of order Characteristics of , The data entered after may be located in 2 Two separate ones session middle , This way 2 individual session combine 1 individual session. here , Need to use merge Method to put multiple accumulator combine 1 individual accumulator.
|
explain
- merge Methods the first 1 Parameters , Must be used AggregateFunction Of ACC Type of accumulator, And the first 1 individual accumulator yes merge After method completion , Where the status is stored .
- merge Methods the first 2 The parameters are 1 individual ACC Type of accumulator Ergodic iterator , There may be something in it 1 One or more accumulator.
Write business logic code
With Java For example , The example code is as follows .
|
Register to use
Registration details : Register to use UDF.
To write SQL sentence
Register at UDF After completion , You can use UDF, Must be database.udf( Database name .udf) Use your UDF function .
In the custom aggregate function SQL An example of the use statement of is as follows .
|
Custom table valued functions (UDTF)
Definition
User defined table aggregation functions (User-Defined Table Aggregate Functions,UDTAGGs), You can put the data in a table , Aggregate into result tables with multiple rows and columns . This one AggregateFunction Very similar , Just before the aggregation result is a scalar value , Now it becomes a table . Similar to a custom scalar function , Custom table valued functions (UDTF) take 0 individual 、1 One or more scalar values as input parameters ( It can be a variable length parameter ). Unlike scalar functions , Table valued functions can return any number of rows as output , Not just 1 It's worth . The returned line can be returned by 1 Columns or columns .
User defined table aggregation functions , By inheritance TableAggregateFunction Abstract class .
TableAggregateFunction It works as follows .
- First , It also needs an accumulator (Accumulator), It's a data structure that holds the intermediate results of aggregation . By calling TableAggregateFunction Of createAccumulator() Method to create an empty accumulator .
- And then , Call the function for each input line accumulate() Method to update the accumulator .
- After processing all the lines , Will call the emitValue() Method to calculate and return the final result .
AggregationFunction Require methods that must be implemented :
- createAccumulator()
- accumulate()
In addition to the above methods , There are also some alternative implementation methods .
- retract()
- merge()
- resetAccumulator()
- emitValue()
- emitUpdateWithRetract()
Business code
UDTF Need to be in TableFunction In the class implementation eval Method .open Methods and close The method is optional . With Java For example , The sample code is as follows .
|
Multiline return
UDTF You can call collect() The implementation will 1 The data of a row is converted to multiple rows and returned .
Multi column return
UDTF Not only can we do 1 Line to line , just so so 1 Column to multi column . If you need UDTF Return to multiple columns , Just declare the return value as Tuple or Row.Tuple or Row Explain the following :
- The return value is Tuple
Real time computing Flink Version supports the use of Tuple1 To Tuple25 , Definition 1 Fields to 25 A field . use Tuple3 Come back to 3 Of fields UDTF Examples are as follows .
|
explain Use Tuple when , Field value cannot be null, And can only exist at most 25 A field .
- The return value is Row
Use Row To return 3 Of fields UDTF Examples are as follows .
|
explain Row The field value of can be null, But if you need to use Row, Must overload implementation getResultType Method .
SQL grammar
UDTF Support cross join and left join, In the use of UDTF You need to add lateral and table keyword .
- cross join
Each row of data in the left table will be associated with UDTF Each row of output data , If UDTF No data is produced , Then this 1 Rows will not be output .
|
- left join
Each row of data in the left table will be associated with UDTF Each row of output data , If UDTF No data is produced , Then this 1 Yes UDTF The field will be used null Value padding .
|
Register to use
Registration details : Register to use UDF.
To write SQL sentence
Register at UDF After completion , You can use UDF, Must be database.udf( Database name .udf) Use your UDF function .
In the custom aggregate function SQL An example of the use statement of is as follows .
|
边栏推荐
- [untitled] numpy learning
- Drop out (pytoch)
- Ind FHL first week
- Bidding website architecture project progress -- Network Security
- 小文件专项
- Ind kwf first week
- Detailed cross validation and grid search -- sklearn implementation
- Install yolov3 (Anaconda)
- Pytoch has been installed, but vs code still displays no module named 'torch‘
- C project - dormitory management system (1)
猜你喜欢

Bid -- service commitment -- self summary

Unity学习笔记:个人学习项目《疯狂天才埃德加》纠错文档

Ind wks first week
![[SQL] an article takes you to master the operations related to query and modification of SQL database](/img/d7/7ac7788a586c4b9c0d7cdf54d974eb.png)
[SQL] an article takes you to master the operations related to query and modification of SQL database

DAY 7 小练习

Multilayer perceptron (pytorch)

Unity group engineering practice project "the strongest takeaway" planning case & error correction document

Classification (data consolidation and grouping aggregation)
![[combinatorial mathematics] pigeon's nest principle (simple form of pigeon's nest principle | simple form examples of pigeon's nest principle 1, 2, 3)](/img/77/fcb4f9739a5171ee58771d041f4d72.jpg)
[combinatorial mathematics] pigeon's nest principle (simple form of pigeon's nest principle | simple form examples of pigeon's nest principle 1, 2, 3)
Redis 笔记 01:入门篇
随机推荐
Class-Variant Margin Normalized Softmax Loss for Deep Face Recognition
QT:QSS自定义 QSplitter实例
Leetcode刷题---367
神经网络入门之预备知识(PyTorch)
带你走进云原生数据库界扛把子Amazon Aurora
MySql 怎么查出符合条件的最新的数据行?
[combinatorial mathematics] pigeon's nest principle (simple form of pigeon's nest principle | simple form examples of pigeon's nest principle 1, 2, 3)
Multilayer perceptron (pytorch)
丢弃法Dropout(Pytorch)
Detailed cross validation and grid search -- sklearn implementation
安装yolov3(Anaconda)
Uni app learning 1 bottom menu and parent-child components
A detailed explanation of vector derivative and matrix derivative
Chiyou (), a specific mythical image, is also gradually abstracted as a dramatic character type "Jing". "Jing", born in Dan Dynasty and ugly at the end, is the earliest "profession" in Chinese drama
C project - dormitory management system (1)
EFFICIENT PROBABILISTIC LOGIC REASONING WITH GRAPH NEURAL NETWORKS
QT:QSS自定义 QProgressBar实例
Numpy quick start (II) -- Introduction to array (creation of array + basic operation of array)
Ut2014 learning notes
深度学习入门之线性代数(PyTorch)