当前位置:网站首页>FlinkSQL自定义UDATF实现TopN
FlinkSQL自定义UDATF实现TopN
2022-06-30 12:06:00 【大数据研习社】
1.UDATF定义
Table aggregate functions(表聚合函数)多进多出(先聚合后炸裂),聚合过程如下图。

Table Aggregate functions聚合炸裂函数实现的核心步骤如下。
(1)继承TableAggregateFunction
(2)必须覆盖createAccumulator
(3)提供1个或者多个accumulate方法,一般就1个,实现更新累加器逻辑
(4)提供emitValue(...)或者emitUpdateWithRetract(...),实现获取计算结果的逻辑
(5)retract方法在OVER windows上才是必须的
(6)merge有界聚合以及会话窗⼝和滑动窗口聚合都需要(对性能优化也有好处)
(7)emitValue有界窗⼝聚合是必须的,无界场景用emitUpdateWithRetract可以提高性能
(8)如果累加器需要保存大的状态,可以使用org.apache.flink.table.api.dataview.ListView或者org.apache.flink.table.api.dataview.MapView以使⽤Flink状态后端,必须跟flatAggregate搭配使用。
2.数据集格式
学生学科考试成绩数据集如下所示:
1, "zhangsan","Chinese","90"
1, "zhangsan","Math","74"
1, "zhangsan","English","88"
2, "lisi","Chinese","86"
2, "lisi","Math","96"
2, "lisi","English","92"
3, "mary","Chinese","59"
3, "mary","Math","99"
3, "mary","English","100"
第一列表示学生ID,第二列表示学生姓名,第三列表示学科,第四列表示成绩。
3.自定义UDATF
FlinkSQL自定义UDATF函数对学生考试成绩进行聚合炸裂操作,按照科目分组统计成绩排前Topn的学生,具体实现代码如下所示。
public class FlinkUdatfFunction {
public static void main(String[] args) {
//1.获取table执行环境
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2.构造数据源
Table scores = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("course", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.DOUBLE())
),
row(1, "zhangsan","Chinese","90"),
row(1, "zhangsan","Math","74"),
row(1, "zhangsan","English","88"),
row(2, "lisi","Chinese","86"),
row(2, "lisi","Math","96"),
row(2, "lisi","English","92"),
row(3, "mary","Chinese","59"),
row(3, "mary","Math","99"),
row(3, "mary","English","100")
).select($("id"), $("name"),$("course"),$("score"));
//3.注册表
tEnv.createTemporaryView("scoresTable",scores);
//4.注册函数
tEnv.createTemporarySystemFunction("Top2Func", new Top2Func());
// 5.Table API调用:使用call函数调用已注册的UDF
tEnv.from("scoresTable")
.groupBy($("course"))
//必须在flatAggregate中调用
.flatAggregate(call("Top2Func",$("score")).as("score","rank"))
.select($("course"),$("score"),$("rank"))
.execute()
.print();
}
/**
* 可变累加器的数据结构
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Top2Accumulator {
/**
* top 1的值
*/
public Double topOne = Double.MIN_VALUE;
/**
* top 2的值
*/
public Double topTwo = Double.MIN_VALUE;
}
//自定义UDATF
public static class Top2Func extends TableAggregateFunction<Tuple2<Double, Integer>, Top2Accumulator> {
//初始化累加器
@Override
public Top2Accumulator createAccumulator() {
return new Top2Accumulator();
}
//数据累加
public void accumulate(Top2Accumulator acc, Double value) {
if (value > acc.topOne) {
acc.topTwo = acc.topOne;
acc.topOne = value;
} else if (value > acc.topTwo) {
acc.topTwo = value;
}
}
//数据局部合并
public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.topOne);
accumulate(acc, otherAcc.topTwo);
}
}
//数据输出
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Double, Integer>> out) {
if (acc.topOne != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topOne, 1));
}
if (acc.topTwo != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.topTwo, 2));
}
}
}
}
4.运行结果
FlinkSQL自定义UDATF函数之后,使用注册的Top2Func函数对学生考试成绩聚合炸裂之后的效果如下所示。

边栏推荐
- Redis6 learning notes - Chapter 2 - Basic redis6 operations
- Videos are stored in a folder every 100 frames, and pictures are transferred to videos after processing
- Pinda general permission system (day 7~day 8)
- JMeter性能测试之相关术语及性能测试通过标准
- NoSQL - redis configuration and optimization
- MySQL中变量的定义和变量的赋值使用
- Subtrate 源码追新导读-5月上旬: XCM 正式启用
- “\“id\“ contains an invalid value“
- SuperMap 3D SDKs_ Unity plug-in development - connect data services for SQL queries
- Some commonly used hardware information of the server (constantly updated)
猜你喜欢

qt msvc 安装及调试

Hisilicon 3559 developing common sense reserves: a complete explanation of related terms

【目标跟踪】|pytracking 配置 win 编译prroi_pool.pyd

Swagger2 automatically generates API documents

品达通用权限系统(Day 7~Day 8)

SuperMap 3D SDKs_ Unity plug-in development - connect data services for SQL queries

Map collection

What is the principle of spectral confocal displacement sensor? Which fields can be applied?

Pinda general permission system (day 7~day 8)

Redis cache problem
随机推荐
Vision based robot grasping: from object localization, object pose estimation to parallel gripper grasping estimation
【一天学awk】内置变量的使用
Understanding and learning of MySQL indexing and optimization
Introduction to sub source code updating: mid May: uniques NFT module and nomination pool
图解使用Navicat for MySQL创建存储过程
90. (cesium chapter) cesium high level listening events
实现多方数据安全共享,解决普惠金融信息不对称难题
Sublist3r error reporting solution
各厂家rtsp地址格式如下:
Map集合
Edusoho enterprise training version intranet only deployment tutorial (to solve the problems of player, upload and background jam)
Double dqn notes
Set set
A review of quantum neural networks 2022 for generating learning tasks
A new journey of the smart court, paperless office, escorting the green trial of the smart court
Sword finger offer 05 Replace spaces: replace each space in the string s with "%20"“
Achieve secure data sharing among multiple parties and solve the problem of asymmetric information in Inclusive Finance
ECDSA signature verification in crypt
浏览器播放rtsp视频,基于nodeJs
90.(cesium篇)cesium高度监听事件