当前位置:网站首页>Tableapi & SQL and MySQL grouping statistics of Flink
Tableapi & SQL and MySQL grouping statistics of Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink JDBC Connector read MYSQL data , And use GROUP BY Statement to group the result set according to one or more columns .
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
GroupToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description Use Tbale&SQL And Flink JDBC Connector read MYSQL data , And use GROUP BY Statement to group the result set according to one or more columns .
*/
public class GroupToMysql {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
Partition scan
To speed up parallelism Source Data reading in task instance ,Flink by JDBC Table provides partition scanning function .
scan.partition.column: The column name used to partition the input .
scan.partition.num: Partition number .
scan.partition.lower-bound: The minimum value of the first partition .
scan.partition.upper-bound: The maximum value of the last partition .
*/
//flink-jdbc-1.11.1 How to write it , All attribute names are in JdbcTableSourceSinkFactory The factory class defines
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
// " 'connector.read.fetch-size' = '10' \n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set up setParallelism Parallelism
env.setParallelism(1);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register mysql Data dimension table
tEnv.executeSql(table_sql);
//Table table = avg(tEnv);
//Table table = count(tEnv);
//Table table = min(tEnv);
Table table = max(tEnv);
// Print the field structure
table.printSchema();
// General query operation toAppendStream
//tEnv.toAppendStream(table, Row.class).print();
//group Operation toRetractStream
//tEnv.toRetractStream(table, Row.class).print();
//table Turn into dataStream flow ,Tuple2 The first parameter flag yes true Express add Add a new record stream ,false Express retract Represents the old record stream
DataStream<Tuple2<Boolean, Row>> behaviorStream = tEnv.toRetractStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception {
if (value.f0) {
System.out.println(value.f1.toString());
}
}
}).print();
env.execute();
}
/**
* avg Get the average of values in a set of data streams
* @param tEnv
* @return
*/
public static Table avg(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,avg(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").avg().as("age1"));
return table;
}
/**
* count Get the sum of the number of rows in a group of data streams
* @param tEnv
* @return
*/
public static Table count(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,count(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").count().as("age1"));
return table;
}
/**
* sum Gets the sum of the number of accumulated packets in a set of data streams
* @param tEnv
* @return
*/
public static Table sum(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,sum(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").sum().as("age1"));
return table;
}
/**
* min Get the minimum value in a set of data streams
* @param tEnv
* @return
*/
public static Table min(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,min(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").min().as("age1"));
return table;
}
/**
* max Get the maximum value in a set of data streams , Every time data is added , Then calculate once
* @param tEnv
* @return
*/
public static Table max(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,max(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").max().as("age1"));
return table;
}
}
Build table SQL
CREATE TABLE `users` (
`id` bigint(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(8) DEFAULT NULL,
`status` tinyint(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4Print the results
root
|-- status: INT
|-- age1: INT
0,16
0,18
1,21
1,28
2,31边栏推荐
- 音乐播放器开发实例(可毕设)
- 购物商城6.27待完成
- DirectX修复工具V4.1公测![通俗易懂]
- Summary of empty string judgment in the project
- In hot summer, please put away this safe gas use guide!
- QT capture interface is displayed as picture or label
- What data capabilities do data product managers need to master?
- 数据产品经理需要掌握哪些数据能力?
- 使用net core 6 c# 的 NPOI 包,读取excel..xlsx单元格内的图片,并存储到指定服务器
- 【14. 区间和(离散化)】
猜你喜欢

Problem note - Oracle 11g uninstall
![After twists and turns, I finally found the method of SRC vulnerability mining [recommended collection]](/img/ac/ab6053e6ea449beedf434d4cf07dbb.png)
After twists and turns, I finally found the method of SRC vulnerability mining [recommended collection]

首届技术播客月开播在即

Fix the failure of idea global search shortcut (ctrl+shift+f)

【14. 区间和(离散化)】

Official announcement: Apache Doris graduated successfully and became the top project of ASF!

关于重载运算符的再整理

Basic use process of cmake

竣达技术丨多台精密空调微信云监控方案

竣达技术丨室内空气环境监测终端 pm2.5、温湿度TVOC等多参数监测
随机推荐
After twists and turns, I finally found the method of SRC vulnerability mining [recommended collection]
What is the relationship between network speed, broadband, bandwidth and traffic?
网速、宽带、带宽、流量三者之间的关系是什么?
Word2vec yyds dry goods inventory
The markdown editor uses basic syntax
Opencv Learning Notes 6 -- image mosaic
SQL常用的四个排序函数梳理
Markdown编辑器使用基本语法
购物商城6.27待完成
Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()
炎炎夏日,这份安全用气指南请街坊们收好!
使用net core 6 c# 的 NPOI 包,讀取excel..xlsx單元格內的圖片,並存儲到指定服務器
关于软件测试的一些思考
Basic operations of SQL database
Internet hospital system source code hospital applet source code smart hospital source code online consultation system source code
[zero basic IOT pwn] reproduce Netgear wnap320 rce
Summary of empty string judgment in the project
Problem note - Oracle 11g uninstall
Storage form of in-depth analysis data in memory
One of the first steps to redis