当前位置:网站首页>Flinksql customizes udatf to implement topn
Flinksql customizes udatf to implement topn
2022-06-30 12:41:00 【Big data Institute】
1.UDATF Definition
Table aggregate functions( Table aggregate functions ) More in, more out ( First polymerize and then burst ), The polymerization process is shown in the following figure .

Table Aggregate functions The core steps of the implementation of the aggregate fragmentation function are as follows .
(1) Inherit TableAggregateFunction
(2) Must cover createAccumulator
(3) Provide 1 One or more accumulate Method , Generally 1 individual , Implement update accumulator logic
(4) Provide emitValue(...) perhaps emitUpdateWithRetract(...), Implement the logic to obtain the calculation results
(5)retract Method in OVER windows That's what's necessary
(6)merge Bounded aggregation and session window ⼝ And sliding window aggregation ( It is also good for performance optimization )
(7)emitValue Bounded window ⼝ Aggregation is necessary , For unbounded scenes emitUpdateWithRetract Can improve performance
(8) If the accumulator needs to save a large state , have access to org.apache.flink.table.api.dataview.ListView perhaps org.apache.flink.table.api.dataview.MapView In order to make ⽤Flink State backend , Must follow flatAggregate Use it with .
2. Dataset format
The student subject test score data set is shown below :
1, "zhangsan","Chinese","90"
1, "zhangsan","Math","74"
1, "zhangsan","English","88"
2, "lisi","Chinese","86"
2, "lisi","Math","96"
2, "lisi","English","92"
3, "mary","Chinese","59"
3, "mary","Math","99"
3, "mary","English","100"
The first column represents the students ID, The second column shows the student's name , The third column represents the discipline , The fourth column shows the results .
3. Customize UDATF
FlinkSQL Customize UDATF Function to aggregate and explode student test scores , According to the grouping of subjects, the scores are ranked first Topn Of the students , The specific implementation code is as follows .
public class FlinkUdatfFunction {
public static void main(String[] args) {
//1. obtain table execution environment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2. Construct data source
Table scores = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("course", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())
),
row(1, "zhangsan","Chinese","90"),
row(1, "zhangsan","Math","74"),
row(1, "zhangsan","English","88"),
row(2, "lisi","Chinese","86"),
row(2, "lisi","Math","96"),
row(2, "lisi","English","92"),
row(3, "mary","Chinese","59"),
row(3, "mary","Math","99"),
row(3, "mary","English","100")
).select($("id"), $("name"),$("course"),$("score"));
//3. The registry
tEnv.createTemporaryView("scoresTable",scores);
//4. Register function
tEnv.createTemporarySystemFunction("Top2Func", new Top2Func());
// 5.Table API call : Use call Function call registered UDF
tEnv.from("scoresTable")
.groupBy($("course"))
// Must be in flatAggregate Call in
.flatAggregate(call("Top2Func",$("score")).as("score","rank"))
.select($("course"),$("score"),$("rank"))
.execute()
.print();
}
/**
* Data structure of variable accumulator
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Top2Accumulator {
/**
* top 1 Value
*/
public Double topOne = Double.MIN_VALUE;
/**
* top 2 Value
*/
public Double topTwo = Double.MIN_VALUE;
}
// Customize UDATF
public static class Top2Func extends TableAggregateFunction<Tuple2<Double, Integer>, Top2Accumulator> {
// Initialize accumulator
@Override
public Top2Accumulator createAccumulator() {
return new Top2Accumulator();
}
// Data accumulation
public void accumulate(Top2Accumulator acc, Double value) {
if (value > acc.topOne) {
acc.topTwo = acc.topOne;
acc.topOne = value;
} else if (value > acc.topTwo) {
acc.topTwo = value;
}
}
// Local data consolidation
public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.topOne);
accumulate(acc, otherAcc.topTwo);
}
}
// Data output
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Double, Integer>> out) {
if (acc.topOne != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topOne, 1));
}
if (acc.topTwo != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topTwo, 2));
}
}
}
}
4. Running results
FlinkSQL Customize UDATF After the function , Use registered Top2Func The effect of the function on students' test scores is as follows .

边栏推荐
- Map集合
- Layout of pyqt5 interface and loading of resource files
- When building the second website with pagoda, the website always reports an error: no input file specified
- Introduction to the pursuit of new subtrate source code - early May: xcm officially launched
- MySQL built-in functions
- Understanding and learning of MySQL indexing and optimization
- Visual Studio配置Qt并通过NSIS实现项目打包
- List collection
- 【300+精选大厂面试题持续分享】大数据运维尖刀面试题专栏(二)
- SuperMap 3D SDKs_ Unity plug-in development - connect data services for SQL queries
猜你喜欢

腾讯二面:@Bean 与 @Component 用在同一个类上,会怎么样?

Use of polarplot function in MATLAB

解决服务器重装无法通过ssh连接的问题

Hisilicon 3559 developing common sense reserves: a complete explanation of related terms

Ensemble de cartes

How to detect 3D line spectral confocal sensors in semiconductors

FlinkSQL自定义UDAF使用的三种方式

Hisilicon 3559 sample parsing: Venc

Browser plays RTSP video based on nodejs

Linux系统Redis的安装
随机推荐
【一天学awk】基础中的基础
Analysis of the whole process of common tilt data processing in SuperMap idesktop
qt msvc 安装及调试
[cf] 803 div2 A. XOR Mixup
Commands for redis basic operations
FlinkSQL自定义UDAF使用的三种方式
NoSQL - redis configuration and optimization
Swagger2自动生成APi文档
MySQL中变量的定义和变量的赋值使用
Ensemble de cartes
立创 EDA #学习笔记10# | 常用连接器元器件识别 和 无源蜂鸣器驱动电路
实现多方数据安全共享,解决普惠金融信息不对称难题
How difficult is data governance and data innovation?
Q-learning notes
[bug solution] fiftyone reports attributeerror: module 'CV2' has no attribute 'GAPI_ wip_ gst_ Gstreamerpipeline 'error resolution
SuperMap 3D SDKs_Unity插件开发——连接数据服务进行SQL查询
Redis的配置文件及新数据类型
Apple executives openly "open the connection": Samsung copied the iPhone and only added a large screen
Today in history: Microsoft acquires PowerPoint developers; SGI and MIPS merge
Set set