当前位置:网站首页>FlinkSQL自定义UDAF使用的三种方式
FlinkSQL自定义UDAF使用的三种方式
2022-06-30 12:06:00 【大数据研习社】
1.UDAF定义
Aggregate functions(聚合函数)将多行的标量值映射到新的标量值(多进一出),聚合函数用到了累加器,下图是聚合过程:

Aggregate functions聚合函数实现的核心步骤如下。
(1)继承AggregateFunction
(2)必须覆盖createAccumulator和getValue
(3)提供accumulate方法
(4)retract⽅法在OVER windows上才是必须的
(5)merge有界聚合以及会话窗⼝和滑动窗口聚合都需要(对性能优化也有好处)
2.数据集格式
学生学科考试成绩数据集如下所示:
1,"zhangsan","Chinese",90
1,"zhangsan","Math",74
1,"zhangsan","English",100
2,"lisi","Chinese",86
2,"lisi","Math",99
2,"lisi","English",92
第一列表示学生ID,第二列表示学生姓名,第三列表示学科,第四列表示成绩。
3.自定义UDAF
FlinkSQL自定义UDAF函数对学生考试成绩进行聚合操作的具体代码如下所示。
public class FlinkAggFunction {
public static void main(String[] args) {
//1.获取stream的执行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.数据源
Table table = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id",DataTypes.DECIMAL(10,2)),
DataTypes.FIELD("name",DataTypes.STRING()),
DataTypes.FIELD("course",DataTypes.STRING()),
DataTypes.FIELD("score",DataTypes.DOUBLE())
),
row(1,"zhangsan","Chinese",90),
row(1,"zhangsan","Math",74),
row(1,"zhangsan","English",100),
row(2,"lisi","Chinese",86),
row(2,"lisi","Math",99),
row(2,"lisi","English",92)
).select($("id"),$("name"),$("course"),$("score"));
tEnv.createTemporaryView("student",table);
//4.1调用方式1 table api(未注册函数)
tEnv.from("student")
.groupBy($("course"))
.select($("course"),call(AvgFunction.class,$("score").as("avg_score")))
.execute().print();
//4.2调用方式2table api(注册函数)
tEnv.createTemporarySystemFunction("AvgFunction",AvgFunction.class);
tEnv.from("student")
.groupBy($("course"))
.select($("course"),call("AvgFunction",$("score").as("avg_score")))
.execute().print();
//4.3调用方式3 sql(注册函数)
tEnv.sqlQuery("select course,AvgFunction(score) as avg_score from student group by course")
.execute().print();
}
//可变累加器的数据结构
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class AvgAccumulator{
public double sum = 0.0;
public int count = 0;
}
//自定义UDAF
public static class AvgFunction extends AggregateFunction<Double,AvgAccumulator>{
//获取累加器的值
@Override
public Double getValue(AvgAccumulator avgAccumulator) {
if(avgAccumulator.count==0){
return null;
}else {
return avgAccumulator.sum/avgAccumulator.count;
}
}
//初始化累加器
@Override
public AvgAccumulator createAccumulator() {
return new AvgAccumulator();
}
//迭代累加
public void accumulate(AvgAccumulator acc,Double score){
acc.setSum(acc.sum+score);
acc.setCount(acc.count+1);
}
}
}
4.运行结果
FlinkSQL自定义UDAF函数之后,使用注册的AvgFunction函数对学生考试成绩聚合之后的效果如下所示。

边栏推荐
- Set集合
- The website with id 0 that was requested wasn‘t found. Verify the website and try again
- Linux系统Redis的安装
- 药店管理系统
- When building the second website with pagoda, the website always reports an error: no input file specified
- ECDSA signature verification in crypt
- 90. (cesium chapter) cesium high level listening events
- Splitting e-commerce systems into micro services
- Redis的基本操作的命令
- Remove invalid parentheses [simulate stack with array]
猜你喜欢

SuperMap iclient3d for webgl loading TMS tiles

Construction de la plate - forme universelle haisi 3559: obtenir le codage après modification du cadre de données

What are the applications of 3D visual inspection in production flow

QT implementation dynamic navigation bar

Tencent two sides: @bean and @component are used on the same class. What happens?

List collection

What is the principle of spectral confocal displacement sensor? Which fields can be applied?

Understanding and learning of MySQL indexing and optimization

Browser plays RTSP video based on nodejs

SuperMap 3D SDKs_Unity插件开发——连接数据服务进行SQL查询
随机推荐
Visual Studio配置Qt并通过NSIS实现项目打包
[cf] 803 div2 A. XOR Mixup
Swagger2自动生成APi文档
Substrate 源码追新导读: 5月中旬: Uniques NFT模块和Nomination Pool
Ensemble de cartes
1175. prime number arrangement: application of multiplication principle
List集合
Four Misunderstandings of Internet Marketing
Joplin implements style changes
解决numpy.core._exceptions.UFuncTypeError: ufunc ‘add‘ did not contain a loop with signature matchin问题
Subtrate 源码追新导读-5月上旬: XCM 正式启用
Redis的配置文件及新数据类型
Browser plays RTSP video based on nodejs
pyqt5界面的布局与资源文件的载入
杂文:自家的智能家居方案研究
AGCO AI frontier promotion (6.30)
剑指 Offer 05. 替换空格: 把字符串 s 中的每个空格替换成“%20“
[QNX Hypervisor 2.2用户手册]6.2.3 Guest与外部之间通信
Redis installation on Linux system
Wechat launched the picture big bang function; Apple's self-developed 5g chip may have failed; Microsoft solves the bug that causes edge to stop responding | geek headlines