当前位置:网站首页>Tableapi & SQL and MySQL grouping statistics of Flink
Tableapi & SQL and MySQL grouping statistics of Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink JDBC Connector read MYSQL data , And use GROUP BY Statement to group the result set according to one or more columns .
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
GroupToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description Use Tbale&SQL And Flink JDBC Connector read MYSQL data , And use GROUP BY Statement to group the result set according to one or more columns .
*/
public class GroupToMysql {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
Partition scan
To speed up parallelism Source Data reading in task instance ,Flink by JDBC Table provides partition scanning function .
scan.partition.column: The column name used to partition the input .
scan.partition.num: Partition number .
scan.partition.lower-bound: The minimum value of the first partition .
scan.partition.upper-bound: The maximum value of the last partition .
*/
//flink-jdbc-1.11.1 How to write it , All attribute names are in JdbcTableSourceSinkFactory The factory class defines
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
// " 'connector.read.fetch-size' = '10' \n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set up setParallelism Parallelism
env.setParallelism(1);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register mysql Data dimension table
tEnv.executeSql(table_sql);
//Table table = avg(tEnv);
//Table table = count(tEnv);
//Table table = min(tEnv);
Table table = max(tEnv);
// Print the field structure
table.printSchema();
// General query operation toAppendStream
//tEnv.toAppendStream(table, Row.class).print();
//group Operation toRetractStream
//tEnv.toRetractStream(table, Row.class).print();
//table Turn into dataStream flow ,Tuple2 The first parameter flag yes true Express add Add a new record stream ,false Express retract Represents the old record stream
DataStream<Tuple2<Boolean, Row>> behaviorStream = tEnv.toRetractStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception {
if (value.f0) {
System.out.println(value.f1.toString());
}
}
}).print();
env.execute();
}
/**
* avg Get the average of values in a set of data streams
* @param tEnv
* @return
*/
public static Table avg(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,avg(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").avg().as("age1"));
return table;
}
/**
* count Get the sum of the number of rows in a group of data streams
* @param tEnv
* @return
*/
public static Table count(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,count(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").count().as("age1"));
return table;
}
/**
* sum Gets the sum of the number of accumulated packets in a set of data streams
* @param tEnv
* @return
*/
public static Table sum(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,sum(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").sum().as("age1"));
return table;
}
/**
* min Get the minimum value in a set of data streams
* @param tEnv
* @return
*/
public static Table min(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,min(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").min().as("age1"));
return table;
}
/**
* max Get the maximum value in a set of data streams , Every time data is added , Then calculate once
* @param tEnv
* @return
*/
public static Table max(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,max(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").max().as("age1"));
return table;
}
}
Build table SQL
CREATE TABLE `users` (
`id` bigint(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(8) DEFAULT NULL,
`status` tinyint(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4Print the results
root
|-- status: INT
|-- age1: INT
0,16
0,18
1,21
1,28
2,31边栏推荐
- Basic operations of SQL database
- The first technology podcast month will be broadcast soon
- JVM第一话 -- JVM入门详解以及运行时数据区分析
- Ensure production safety! Guangzhou requires hazardous chemical enterprises to "not produce in an unsafe way, and keep constant communication"
- Use the npoi package of net core 6 C to read excel Pictures in xlsx cells and stored to the specified server
- DirectX repair tool v4.1 public beta! [easy to understand]
- MySQL审计插件介绍
- leetcode:329. Longest increasing path in matrix
- Error-tf. function-decorated function tried to create variables on non-first call
- 【14. 区间和(离散化)】
猜你喜欢

opencv学习笔记五--文件扫描+OCR文字识别

leetcode:329. Longest increasing path in matrix

JVM second conversation -- JVM memory model and garbage collection

Semiconductor foundation of binary realization principle
![[14. Interval sum (discretization)]](/img/e5/8b29aca7068a6385e8ce90c2742c37.png)
[14. Interval sum (discretization)]

微服务追踪SQL(支持Isto管控下的gorm查询追踪)

The data in the database table recursively forms a closed-loop data. How can we get these data

互联网医院系统源码 医院小程序源码 智慧医院源码 在线问诊系统源码

What are the books that have greatly improved the thinking and ability of programming?

Junda technology indoor air environment monitoring terminal PM2.5, temperature and humidity TVOC and other multi parameter monitoring
随机推荐
[zero basic IOT pwn] reproduce Netgear wnap320 rce
Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches
tensorflow2-savedmodel convert to tflite
数据产品经理需要掌握哪些数据能力?
Reorganize the trivial knowledge points at the end of the term
solidty-基础篇-基础语法和定义函数
Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()
The first word of JVM -- detailed introduction to JVM and analysis of runtime data area
Ubuntu 14.04下搭建MySQL主从服务器
Solidty智能合约开发-简易入门
tensorflow2-savedmodel convert to pb(frozen_graph)
MySQL审计插件介绍
【天线】【3】CST一些快捷键
Digital transformation: data visualization enables sales management
What are the books that have greatly improved the thinking and ability of programming?
In hot summer, please put away this safe gas use guide!
JS中箭头函数和普通函数的区别
MySQL 服务正在启动 MySQL 服务无法启动解决途径
En utilisant le paquet npoi de net Core 6 c #, lisez Excel.. Image dans la cellule xlsx et stockée sur le serveur spécifié
职场太老实,总被欺负怎么办?