当前位置:网站首页>Flinksql customizes udatf to implement topn
Flinksql customizes udatf to implement topn
2022-06-30 12:41:00 【Big data Institute】
1.UDATF Definition
Table aggregate functions( Table aggregate functions ) More in, more out ( First polymerize and then burst ), The polymerization process is shown in the following figure .

Table Aggregate functions The core steps of the implementation of the aggregate fragmentation function are as follows .
(1) Inherit TableAggregateFunction
(2) Must cover createAccumulator
(3) Provide 1 One or more accumulate Method , Generally 1 individual , Implement update accumulator logic
(4) Provide emitValue(...) perhaps emitUpdateWithRetract(...), Implement the logic to obtain the calculation results
(5)retract Method in OVER windows That's what's necessary
(6)merge Bounded aggregation and session window ⼝ And sliding window aggregation ( It is also good for performance optimization )
(7)emitValue Bounded window ⼝ Aggregation is necessary , For unbounded scenes emitUpdateWithRetract Can improve performance
(8) If the accumulator needs to save a large state , have access to org.apache.flink.table.api.dataview.ListView perhaps org.apache.flink.table.api.dataview.MapView In order to make ⽤Flink State backend , Must follow flatAggregate Use it with .
2. Dataset format
The student subject test score data set is shown below :
1, "zhangsan","Chinese","90"
1, "zhangsan","Math","74"
1, "zhangsan","English","88"
2, "lisi","Chinese","86"
2, "lisi","Math","96"
2, "lisi","English","92"
3, "mary","Chinese","59"
3, "mary","Math","99"
3, "mary","English","100"
The first column represents the students ID, The second column shows the student's name , The third column represents the discipline , The fourth column shows the results .
3. Customize UDATF
FlinkSQL Customize UDATF Function to aggregate and explode student test scores , According to the grouping of subjects, the scores are ranked first Topn Of the students , The specific implementation code is as follows .
public class FlinkUdatfFunction {
public static void main(String[] args) {
//1. obtain table execution environment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2. Construct data source
Table scores = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("course", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())
),
row(1, "zhangsan","Chinese","90"),
row(1, "zhangsan","Math","74"),
row(1, "zhangsan","English","88"),
row(2, "lisi","Chinese","86"),
row(2, "lisi","Math","96"),
row(2, "lisi","English","92"),
row(3, "mary","Chinese","59"),
row(3, "mary","Math","99"),
row(3, "mary","English","100")
).select($("id"), $("name"),$("course"),$("score"));
//3. The registry
tEnv.createTemporaryView("scoresTable",scores);
//4. Register function
tEnv.createTemporarySystemFunction("Top2Func", new Top2Func());
// 5.Table API call : Use call Function call registered UDF
tEnv.from("scoresTable")
.groupBy($("course"))
// Must be in flatAggregate Call in
.flatAggregate(call("Top2Func",$("score")).as("score","rank"))
.select($("course"),$("score"),$("rank"))
.execute()
.print();
}
/**
* Data structure of variable accumulator
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Top2Accumulator {
/**
* top 1 Value
*/
public Double topOne = Double.MIN_VALUE;
/**
* top 2 Value
*/
public Double topTwo = Double.MIN_VALUE;
}
// Customize UDATF
public static class Top2Func extends TableAggregateFunction<Tuple2<Double, Integer>, Top2Accumulator> {
// Initialize accumulator
@Override
public Top2Accumulator createAccumulator() {
return new Top2Accumulator();
}
// Data accumulation
public void accumulate(Top2Accumulator acc, Double value) {
if (value > acc.topOne) {
acc.topTwo = acc.topOne;
acc.topOne = value;
} else if (value > acc.topTwo) {
acc.topTwo = value;
}
}
// Local data consolidation
public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.topOne);
accumulate(acc, otherAcc.topTwo);
}
}
// Data output
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Double, Integer>> out) {
if (acc.topOne != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topOne, 1));
}
if (acc.topTwo != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topTwo, 2));
}
}
}
}
4. Running results
FlinkSQL Customize UDATF After the function , Use registered Top2Func The effect of the function on students' test scores is as follows .

边栏推荐
猜你喜欢

腾讯二面:@Bean 与 @Component 用在同一个类上,会怎么样?

Introduction to new features of ES6

Google refutes rumors and gives up tensorflow. It's still alive!

实现多方数据安全共享,解决普惠金融信息不对称难题

Set set

Basic interview questions for Software Test Engineers (required for fresh students and test dishes) the most basic interview questions

Browser plays RTSP video based on nodejs

iServer发布ES服务查询设置最大返回数量

90. (cesium chapter) cesium high level listening events

Sarsa notes
随机推荐
ECDSA signature verification in crypt
FlinkSQL自定义UDATF实现TopN
Commands for redis basic operations
Redis的配置文件及新数据类型
The format of RTSP address of each manufacturer is as follows:
拆分电商系统为微服务
Use of polarplot function in MATLAB
MySQL判断执行条件为NULL时,返回0,出错问题解决 Incorrect parameter count in the call to native function ‘ISNULL‘,
Four Misunderstandings of Internet Marketing
Redis configuration files and new data types
7 lightweight and easy-to-use tools to relieve pressure and improve efficiency for developers, and help enterprises' agile cloud launch | wonderful review of techo day
Hisilicon 3559 sample parsing: Venc
Map集合
Mysql中 begin..end使用遇到的坑
New function of SuperMap iserver11i -- release and use of legend
Scratch drawing square electronic society graphical programming scratch grade examination level 2 true questions and answers analysis June 2022
[leetcode] 15. Sum of three numbers
Basic interview questions for Software Test Engineers (required for fresh students and test dishes) the most basic interview questions
【一天学awk】基础中的基础
How to detect 3D line spectral confocal sensors in semiconductors