当前位置:网站首页>FlinkSQL自定义UDTF使用的四种方式
FlinkSQL自定义UDTF使用的四种方式
2022-06-30 12:06:00 【大数据研习社】
1.UDTF定义
Table functions(表函数)⼀进多出(炸裂),继承TableFunction,提供⽆返回值的eval⽅法,使⽤collect来输出。Table functions的返回值是⼀个表,需要跟原来的表join才能得到最终结果,因此要⽤到侧写表(不明⽩的可以研究下LATERAL TABLE)
2.数据集格式
用户使用app的测试数据集如下所示:
{"userId":9527,"day":"2022-05-12","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.browser","activetime":120000},{"package":"com.qq","activetime":80000}]}
{"userId":9528,"day":"2022-05-13","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.weixin","activetime":150000},{"package":"com.youdao","activetime":60000}]}
{"userId":9529,"day":"2022-05-14","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.ebay","activetime":40000},{"package":"com.baidu","activetime":120000}]}
{"userId":9530,"day":"2022-05-15","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.huawei","activetime":180000},{"package":"com.leshi","activetime":20000}]}
{"userId":9531,"day":"2022-05-16","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.xiaomi","activetime":150000},{"package":"com.wangyi","activetime":60000}]}
在JSON对象中,userId表示用户ID,day表示日期,begintime表示开始时间,endtime表示结束时间。data表示用户使用应用的时长列表,其中package表示应用名称,activetime表示应用使用时长。
3.自定义UDTF
FlinkSQL自定义UDTF函数解析用户行为数据的具体代码实现如下所示。
public class FlinkSQLBaseTableFunction {
public static void main(String[] args) {
//1.获取stream的执行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
senv.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3.读取数据
DataStream<String> lines = senv.socketTextStream("hadoop1",8888);
//4.流转换为动态表
Table table = tEnv.fromDataStream(lines,$("line"));
tEnv.createTemporaryView("userbehavior",table);
//5.1调用方式1:TableAPI内连接实现
tEnv.from("userbehavior")
.joinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))
.select(
call(JsonFunction.class,$("line"),"userId"),
call(JsonFunction.class,$("line"),"day"),
call(JsonFunction.class,$("line"),"begintime"),
call(JsonFunction.class,$("line"),"endtime"),
$("package"),
$("activetime")
).execute().print();
//5.2调用方式2:TableAPI左外连接实现
tEnv.from("userbehavior")
.leftOuterJoinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))
.select(
call(JsonFunction.class,$("line"),"userId"),
call(JsonFunction.class,$("line"),"day"),
call(JsonFunction.class,$("line"),"begintime"),
call(JsonFunction.class,$("line"),"endtime"),
$("package"),
$("activetime")
).execute().print();
tEnv.createTemporarySystemFunction("JsonFunction",JsonFunction.class);
tEnv.createTemporarySystemFunction("explodeFunction",explodeFunction.class);
//5.3 调用方式3:sql内连接实现
tEnv.sqlQuery("select " +
"JsonFunction(line,'userId')," +
"JsonFunction(line,'day')," +
"JsonFunction(line,'begintime')," +
"JsonFunction(line,'endtime')," +
"package," +
"activetime " +
" from userbehavior " +
",lateral table(explodeFunction(line,'data')) "
).execute().print();
//5.4调用方式4:sql左外连接实现
tEnv.sqlQuery("select " +
"JsonFunction(line,'userId')," +
"JsonFunction(line,'day')," +
"JsonFunction(line,'begintime')," +
"JsonFunction(line,'endtime')," +
"package," +
"activetime " +
" from userbehavior " +
"left join lateral table(explodeFunction(line,'data')) as sc(package,activetime) on true "
).execute().print();
}
/**
* 自定义udf
*/
public static class JsonFunction extends ScalarFunction {
public String eval(String line,String key){
//转换为JSON
JSONObject baseJson = new JSONObject(line);
String value = "";
if(baseJson.has(key)){
//根据key获取value
return baseJson.getString(key);
}
return value;
}
}
/**
* 自定义udtf
*/
@FunctionHint(output = @DataTypeHint("ROW<package STRING,activetime INT>"))
public static class explodeFunction extends TableFunction{
public void eval(String line,String key){
//转换为JSON
JSONObject baseJson = new JSONObject(line);
//提取key为data的JSONArray数据
JSONArray jsonArray = new JSONArray(baseJson.getString(key));
//循环解析输出
for(int i=0;i<jsonArray.length();i++){
String col1 = jsonArray.getJSONObject(i).getString("package");
Integer col2 = Integer.parseInt(jsonArray.getJSONObject(i).getString("activetime"));
collect(Row.of(col1,col2));
}
}
}
}
4.输入测试数据
在Linux虚拟机的hadoop1节点中使用如下命令打开nc服务并输入测试数据集。
[[email protected] ~] nc -lk 8888

5.运行结果
FlinkSQL自定义UDTF函数之后,使用注册的JsonFunction和explodeFunction函数对用户行为数据解析之后的效果如下所示。

边栏推荐
- Remove invalid parentheses [simulate stack with array]
- A new journey of the smart court, paperless office, escorting the green trial of the smart court
- Charles打断点修改请求数据&响应数据
- Sarsa notes
- "Xiaodeng" user personal data management in operation and maintenance
- 60 个神级 VS Code 插件!!
- List集合
- Ensemble de cartes
- Understanding and learning of MySQL indexing and optimization
- 时空预测2-GCN_LSTM
猜你喜欢

Lichuang EDA learning notes 10 common connector component identification and passive buzzer driving circuit

数据仓库建设之确定主题域

海思3559 sample解析:venc

How to detect 3D line spectral confocal sensors in semiconductors

List collection

AGCO AI frontier promotion (6.30)

QT MSVC installation and commissioning

Redis installation on Linux system

Inner join and outer join of MySQL tables

Introduction to new features of ES6
随机推荐
杂文:自家的智能家居方案研究
When building the second website with pagoda, the website always reports an error: no input file specified
How to select an OLAP database engine?
Biological network analysis using deep learning
Lvgl widget styles
图解使用Navicat for MySQL创建存储过程
MySQL判断执行条件为NULL时,返回0,出错问题解决 Incorrect parameter count in the call to native function ‘ISNULL‘,
How to use the plug-in mechanism to gracefully encapsulate your request hook
Reading the table data of Tencent documents in the applet
【目标跟踪】|pytracking 配置 win 编译prroi_pool.pyd
200. number of islands
Videos are stored in a folder every 100 frames, and pictures are transferred to videos after processing
药店管理系统
AGCO AI frontier promotion (6.30)
Building of Hisilicon 3559 universal platform: obtaining the modified code of data frame
Grep match lookup
Talk about how to do hardware compatibility testing and quickly migrate to openeuler?
Use of redis in projects
Construction de la plate - forme universelle haisi 3559: obtenir le codage après modification du cadre de données
Mysql中 begin..end使用遇到的坑