当前位置:网站首页>FlinkSQL自定义UDATF实现TopN
FlinkSQL自定义UDATF实现TopN
2022-06-30 12:06:00 【大数据研习社】
1.UDATF定义
Table aggregate functions(表聚合函数)多进多出(先聚合后炸裂),聚合过程如下图。

Table Aggregate functions聚合炸裂函数实现的核心步骤如下。
(1)继承TableAggregateFunction
(2)必须覆盖createAccumulator
(3)提供1个或者多个accumulate方法,一般就1个,实现更新累加器逻辑
(4)提供emitValue(...)或者emitUpdateWithRetract(...),实现获取计算结果的逻辑
(5)retract方法在OVER windows上才是必须的
(6)merge有界聚合以及会话窗⼝和滑动窗口聚合都需要(对性能优化也有好处)
(7)emitValue有界窗⼝聚合是必须的,无界场景用emitUpdateWithRetract可以提高性能
(8)如果累加器需要保存大的状态,可以使用org.apache.flink.table.api.dataview.ListView或者org.apache.flink.table.api.dataview.MapView以使⽤Flink状态后端,必须跟flatAggregate搭配使用。
2.数据集格式
学生学科考试成绩数据集如下所示:
1, "zhangsan","Chinese","90"
1, "zhangsan","Math","74"
1, "zhangsan","English","88"
2, "lisi","Chinese","86"
2, "lisi","Math","96"
2, "lisi","English","92"
3, "mary","Chinese","59"
3, "mary","Math","99"
3, "mary","English","100"
第一列表示学生ID,第二列表示学生姓名,第三列表示学科,第四列表示成绩。
3.自定义UDATF
FlinkSQL自定义UDATF函数对学生考试成绩进行聚合炸裂操作,按照科目分组统计成绩排前Topn的学生,具体实现代码如下所示。
public class FlinkUdatfFunction {
public static void main(String[] args) {
//1.获取table执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2.构造数据源
Table scores = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("course", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())
),
row(1, "zhangsan","Chinese","90"),
row(1, "zhangsan","Math","74"),
row(1, "zhangsan","English","88"),
row(2, "lisi","Chinese","86"),
row(2, "lisi","Math","96"),
row(2, "lisi","English","92"),
row(3, "mary","Chinese","59"),
row(3, "mary","Math","99"),
row(3, "mary","English","100")
).select($("id"), $("name"),$("course"),$("score"));
//3.注册表
tEnv.createTemporaryView("scoresTable",scores);
//4.注册函数
tEnv.createTemporarySystemFunction("Top2Func", new Top2Func());
// 5.Table API调用:使用call函数调用已注册的UDF
tEnv.from("scoresTable")
.groupBy($("course"))
//必须在flatAggregate中调用
.flatAggregate(call("Top2Func",$("score")).as("score","rank"))
.select($("course"),$("score"),$("rank"))
.execute()
.print();
}
/**
* 可变累加器的数据结构
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Top2Accumulator {
/**
* top 1的值
*/
public Double topOne = Double.MIN_VALUE;
/**
* top 2的值
*/
public Double topTwo = Double.MIN_VALUE;
}
//自定义UDATF
public static class Top2Func extends TableAggregateFunction<Tuple2<Double, Integer>, Top2Accumulator> {
//初始化累加器
@Override
public Top2Accumulator createAccumulator() {
return new Top2Accumulator();
}
//数据累加
public void accumulate(Top2Accumulator acc, Double value) {
if (value > acc.topOne) {
acc.topTwo = acc.topOne;
acc.topOne = value;
} else if (value > acc.topTwo) {
acc.topTwo = value;
}
}
//数据局部合并
public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.topOne);
accumulate(acc, otherAcc.topTwo);
}
}
//数据输出
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Double, Integer>> out) {
if (acc.topOne != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topOne, 1));
}
if (acc.topTwo != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topTwo, 2));
}
}
}
}
4.运行结果
FlinkSQL自定义UDATF函数之后,使用注册的Top2Func函数对学生考试成绩聚合炸裂之后的效果如下所示。

边栏推荐
- A review of quantum neural networks 2022 for generating learning tasks
- ES6新特性介绍
- Map集合
- Talk about how to do hardware compatibility testing and quickly migrate to openeuler?
- Lichuang EDA learning notes 10 common connector component identification and passive buzzer driving circuit
- Vscode select multiple words
- Set集合
- Redis configuration files and new data types
- 【一天学awk】数组的使用
- Joplin implements style changes
猜你喜欢
![[leetcode] 15. Sum of three numbers](/img/0c/4363d7737d90c170eb4519828990b9.png)
[leetcode] 15. Sum of three numbers

90. (cesium chapter) cesium high level listening events

Iserver publishing es service query setting maximum return quantity

浏览器播放rtsp视频,基于nodeJs

Solve the problem that the server cannot be connected via SSH during reinstallation

Redis installation on Linux system

Sarsa notes

QT MSVC installation and commissioning

使用Power Designer工具构建数据库模型

【一天学awk】数组的使用
随机推荐
[cloud native | kubernetes] in depth understanding of deployment (VIII)
药店管理系统
The realization of QT the flipping effect of QQ weather forecast window
海思3559开发常识储备:相关名词全解
Dqn notes
[QNX Hypervisor 2.2用户手册]6.2.3 Guest与外部之间通信
ES6新特性介绍
Efficient elliptic curve point addition and multiplication in scrypt
Substrate 源码追新导读: 5月中旬: Uniques NFT模块和Nomination Pool
What is the principle of spectral confocal displacement sensor? Which fields can be applied?
JMeter性能测试之相关术语及性能测试通过标准
Sarsa notes
【一天学awk】正则匹配
Idea has a new artifact, a set of code to adapt to multiple terminals!
Hisilicon 3559 sample parsing: Venc
数据仓库建设之确定主题域
Swagger2自动生成APi文档
Charles打断点修改请求数据&响应数据
【一天学awk】内置变量的使用
grep匹配查找