当前位置:网站首页>FlinkSQL自定义UDATF实现TopN
FlinkSQL自定义UDATF实现TopN
2022-06-30 12:06:00 【大数据研习社】
1.UDATF定义
Table aggregate functions(表聚合函数)多进多出(先聚合后炸裂),聚合过程如下图。
Table Aggregate functions聚合炸裂函数实现的核心步骤如下。
(1)继承TableAggregateFunction
(2)必须覆盖createAccumulator
(3)提供1个或者多个accumulate方法,一般就1个,实现更新累加器逻辑
(4)提供emitValue(...)或者emitUpdateWithRetract(...),实现获取计算结果的逻辑
(5)retract方法在OVER windows上才是必须的
(6)merge有界聚合以及会话窗⼝和滑动窗口聚合都需要(对性能优化也有好处)
(7)emitValue有界窗⼝聚合是必须的,无界场景用emitUpdateWithRetract可以提高性能
(8)如果累加器需要保存大的状态,可以使用org.apache.flink.table.api.dataview.ListView或者org.apache.flink.table.api.dataview.MapView以使⽤Flink状态后端,必须跟flatAggregate搭配使用。
2.数据集格式
学生学科考试成绩数据集如下所示:
1, "zhangsan","Chinese","90"
1, "zhangsan","Math","74"
1, "zhangsan","English","88"
2, "lisi","Chinese","86"
2, "lisi","Math","96"
2, "lisi","English","92"
3, "mary","Chinese","59"
3, "mary","Math","99"
3, "mary","English","100"
第一列表示学生ID,第二列表示学生姓名,第三列表示学科,第四列表示成绩。
3.自定义UDATF
FlinkSQL自定义UDATF函数对学生考试成绩进行聚合炸裂操作,按照科目分组统计成绩排前Topn的学生,具体实现代码如下所示。
public class FlinkUdatfFunction {
public static void main(String[] args) {
//1.获取table执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2.构造数据源
Table scores = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("course", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())
),
row(1, "zhangsan","Chinese","90"),
row(1, "zhangsan","Math","74"),
row(1, "zhangsan","English","88"),
row(2, "lisi","Chinese","86"),
row(2, "lisi","Math","96"),
row(2, "lisi","English","92"),
row(3, "mary","Chinese","59"),
row(3, "mary","Math","99"),
row(3, "mary","English","100")
).select($("id"), $("name"),$("course"),$("score"));
//3.注册表
tEnv.createTemporaryView("scoresTable",scores);
//4.注册函数
tEnv.createTemporarySystemFunction("Top2Func", new Top2Func());
// 5.Table API调用:使用call函数调用已注册的UDF
tEnv.from("scoresTable")
.groupBy($("course"))
//必须在flatAggregate中调用
.flatAggregate(call("Top2Func",$("score")).as("score","rank"))
.select($("course"),$("score"),$("rank"))
.execute()
.print();
}
/**
* 可变累加器的数据结构
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Top2Accumulator {
/**
* top 1的值
*/
public Double topOne = Double.MIN_VALUE;
/**
* top 2的值
*/
public Double topTwo = Double.MIN_VALUE;
}
//自定义UDATF
public static class Top2Func extends TableAggregateFunction<Tuple2<Double, Integer>, Top2Accumulator> {
//初始化累加器
@Override
public Top2Accumulator createAccumulator() {
return new Top2Accumulator();
}
//数据累加
public void accumulate(Top2Accumulator acc, Double value) {
if (value > acc.topOne) {
acc.topTwo = acc.topOne;
acc.topOne = value;
} else if (value > acc.topTwo) {
acc.topTwo = value;
}
}
//数据局部合并
public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.topOne);
accumulate(acc, otherAcc.topTwo);
}
}
//数据输出
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Double, Integer>> out) {
if (acc.topOne != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topOne, 1));
}
if (acc.topTwo != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topTwo, 2));
}
}
}
}
4.运行结果
FlinkSQL自定义UDATF函数之后,使用注册的Top2Func函数对学生考试成绩聚合炸裂之后的效果如下所示。
边栏推荐
- 解决服务器重装无法通过ssh连接的问题
- Swagger2自动生成APi文档
- 60 个神级 VS Code 插件!!
- 200. number of islands
- 品达通用权限系统(Day 7~Day 8)
- Redis - problèmes de cache
- Vscode select multiple words
- 90. (cesium chapter) cesium high level listening events
- 视频按每100帧存一个文件夹,处理完再图片转视频
- Edusoho enterprise training version intranet only deployment tutorial (to solve the problems of player, upload and background jam)
猜你喜欢
How to use the plug-in mechanism to gracefully encapsulate your request hook
Map集合
QT MSVC installation and commissioning
Dqn notes
Map collection
90.(cesium篇)cesium高度监听事件
Redis-緩存問題
Some commonly used hardware information of the server (constantly updated)
New function of SuperMap iserver11i -- release and use of legend
21. Notes on WPF binding
随机推荐
Redis的基本操作的命令
Redis installation on Linux system
海思3559开发常识储备:相关名词全解
Hisilicon 3559 sample parsing: Venc
Joplin implements style changes
ES6新特性介绍
Ensemble de cartes
Four Misunderstandings of Internet Marketing
SuperMap iclient3d for webgl loading TMS tiles
不同类型的变量与零究竟是如何比较
MySql实现两个查询结果相除
Commands for redis basic operations
Tencent two sides: @bean and @component are used on the same class. What happens?
时空预测2-GCN_LSTM
Visual Studio配置Qt并通过NSIS实现项目打包
剑指 Offer 05. 替换空格: 把字符串 s 中的每个空格替换成“%20“
JMeter性能测试工作中遇到的问题及剖析,你遇到了几个?
SuperMap 3D SDKs_ Unity plug-in development - connect data services for SQL queries
Statistics on the number of closed Islands
Introduction to sub source code updating: mid May: uniques NFT module and nomination pool