当前位置:网站首页>Four ways for flinksql to customize udtf
Four ways for flinksql to customize udtf
2022-06-30 12:41:00 【Big data Institute】
1.UDTF Definition
Table functions( Table function )⼀ In out ( burst ), Inherit TableFunction, Provide ⽆ The return value of eval⽅ Law , send ⽤collect To output .Table functions The return value of ⼀ Tables , It needs to be the same as the original table join To get the final result , So we have to ⽤ To the profile sheet ( Unknown ⽩ You can study LATERAL TABLE)
2. Dataset format
The user to use app The test data set for is as follows :
{"userId":9527,"day":"2022-05-12","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.browser","activetime":120000},{"package":"com.qq","activetime":80000}]}
{"userId":9528,"day":"2022-05-13","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.weixin","activetime":150000},{"package":"com.youdao","activetime":60000}]}
{"userId":9529,"day":"2022-05-14","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.ebay","activetime":40000},{"package":"com.baidu","activetime":120000}]}
{"userId":9530,"day":"2022-05-15","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.huawei","activetime":180000},{"package":"com.leshi","activetime":20000}]}
{"userId":9531,"day":"2022-05-16","begintime":1620785058833,"endtime":1620785217511,"data":[{"package":"com.xiaomi","activetime":150000},{"package":"com.wangyi","activetime":60000}]}
stay JSON In the object ,userId Represent user ID,day Indicates the date ,begintime Indicates the start time ,endtime Indicates the end time .data It indicates the list of application duration used by the user , among package Represents the application name ,activetime Indicates the duration of the application .
3. Customize UDTF
FlinkSQL Customize UDTF The specific code implementation of the function to parse user behavior data is as follows .
public class FlinkSQLBaseTableFunction {
public static void main(String[] args) {
//1. obtain stream Execution environment
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// Set parallelism
senv.setParallelism(1);
//2. Create a table execution environment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
//3. Reading data
DataStream<String> lines = senv.socketTextStream("hadoop1",8888);
//4. Convert stream to dynamic table
Table table = tEnv.fromDataStream(lines,$("line"));
tEnv.createTemporaryView("userbehavior",table);
//5.1 Call mode 1:TableAPI Internal connection implementation
tEnv.from("userbehavior")
.joinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))
.select(
call(JsonFunction.class,$("line"),"userId"),
call(JsonFunction.class,$("line"),"day"),
call(JsonFunction.class,$("line"),"begintime"),
call(JsonFunction.class,$("line"),"endtime"),
$("package"),
$("activetime")
).execute().print();
//5.2 Call mode 2:TableAPI The left outer connection realizes
tEnv.from("userbehavior")
.leftOuterJoinLateral(call(explodeFunction.class,$("line"),"data").as("package","activetime"))
.select(
call(JsonFunction.class,$("line"),"userId"),
call(JsonFunction.class,$("line"),"day"),
call(JsonFunction.class,$("line"),"begintime"),
call(JsonFunction.class,$("line"),"endtime"),
$("package"),
$("activetime")
).execute().print();
tEnv.createTemporarySystemFunction("JsonFunction",JsonFunction.class);
tEnv.createTemporarySystemFunction("explodeFunction",explodeFunction.class);
//5.3 Call mode 3:sql Internal connection implementation
tEnv.sqlQuery("select " +
"JsonFunction(line,'userId')," +
"JsonFunction(line,'day')," +
"JsonFunction(line,'begintime')," +
"JsonFunction(line,'endtime')," +
"package," +
"activetime " +
" from userbehavior " +
",lateral table(explodeFunction(line,'data')) "
).execute().print();
//5.4 Call mode 4:sql The left outer connection realizes
tEnv.sqlQuery("select " +
"JsonFunction(line,'userId')," +
"JsonFunction(line,'day')," +
"JsonFunction(line,'begintime')," +
"JsonFunction(line,'endtime')," +
"package," +
"activetime " +
" from userbehavior " +
"left join lateral table(explodeFunction(line,'data')) as sc(package,activetime) on true "
).execute().print();
}
/**
* Customize udf
*/
public static class JsonFunction extends ScalarFunction {
public String eval(String line,String key){
// Convert to JSON
JSONObject baseJson = new JSONObject(line);
String value = "";
if(baseJson.has(key)){
// according to key obtain value
return baseJson.getString(key);
}
return value;
}
}
/**
* Customize udtf
*/
@FunctionHint(output = @DataTypeHint("ROW<package STRING,activetime INT>"))
public static class explodeFunction extends TableFunction{
public void eval(String line,String key){
// Convert to JSON
JSONObject baseJson = new JSONObject(line);
// extract key by data Of JSONArray data
JSONArray jsonArray = new JSONArray(baseJson.getString(key));
// Loop parse output
for(int i=0;i<jsonArray.length();i++){
String col1 = jsonArray.getJSONObject(i).getString("package");
Integer col2 = Integer.parseInt(jsonArray.getJSONObject(i).getString("activetime"));
collect(Row.of(col1,col2));
}
}
}
}
4. Input test data
stay Linux The virtual machine hadoop1 In the node, use the following command to open nc Service and enter the test data set .
[[email protected] ~] nc -lk 8888

5. Running results
FlinkSQL Customize UDTF After the function , Use registered JsonFunction and explodeFunction The effect of the function after parsing the user behavior data is as follows .

边栏推荐
- Videos are stored in a folder every 100 frames, and pictures are transferred to videos after processing
- The format of RTSP address of each manufacturer is as follows:
- 杂文:自家的智能家居方案研究
- 问卷星问卷抓包分析
- Mysql判断计算结果,除以100
- Substrate 源码追新导读: 波卡系波卡权重计算全面更新, Governance 2.0 版本的优化和调整
- [bug solution] fiftyone reports attributeerror: module 'CV2' has no attribute 'GAPI_ wip_ gst_ Gstreamerpipeline 'error resolution
- Essay: Research on smart home scheme
- [cf] 803 div2 B. Rising Sand
- Map集合
猜你喜欢

ES6新特性介绍

Scratch drawing square electronic society graphical programming scratch grade examination level 2 true questions and answers analysis June 2022

Map collection

江西财经大学智慧江财登录分析
![[leetcode] 15. Sum of three numbers](/img/0c/4363d7737d90c170eb4519828990b9.png)
[leetcode] 15. Sum of three numbers

SuperMap 3D SDKs_Unity插件开发——连接数据服务进行SQL查询

FlinkSQL自定义UDATF实现TopN

Tencent cloud Database Engineer competency certification was launched, and people from all walks of life talked about talent training problems

Google refutes rumors and gives up tensorflow. It's still alive!

iServer发布ES服务查询设置最大返回数量
随机推荐
Redis6 learning notes - Chapter 2 - Basic redis6 operations
What are the applications of 3D visual inspection in production flow
Understanding and learning of MySQL indexing and optimization
Hisilicon 3559 sample parsing: Venc
SuperMap iDesktop 常见倾斜数据处理全流程解析
[cf] 803 div2 A. XOR Mixup
Grep match lookup
Scratch drawing square electronic society graphical programming scratch grade examination level 2 true questions and answers analysis June 2022
【一天学awk】运算符
Redis-緩存問題
7 lightweight and easy-to-use tools to relieve pressure and improve efficiency for developers, and help enterprises' agile cloud launch | wonderful review of techo day
Flink sql控制台,不识别group_concat函数吗?
Charles打断点修改请求数据&响应数据
Linux系统Redis的安装
Solve the problem that the server cannot be connected via SSH during reinstallation
解决服务器重装无法通过ssh连接的问题
Map集合
Today in history: Microsoft acquires PowerPoint developers; SGI and MIPS merge
市值蒸发650亿后,“口罩大王”稳健医疗,盯上了安全套
Redis configuration files and new data types