当前位置:网站首页>Tableapi & SQL and MySQL grouping statistics of Flink
Tableapi & SQL and MySQL grouping statistics of Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink JDBC Connector read MYSQL data , And use GROUP BY Statement to group the result set according to one or more columns .
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
GroupToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description Use Tbale&SQL And Flink JDBC Connector read MYSQL data , And use GROUP BY Statement to group the result set according to one or more columns .
*/
public class GroupToMysql {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
Partition scan
To speed up parallelism Source Data reading in task instance ,Flink by JDBC Table provides partition scanning function .
scan.partition.column: The column name used to partition the input .
scan.partition.num: Partition number .
scan.partition.lower-bound: The minimum value of the first partition .
scan.partition.upper-bound: The maximum value of the last partition .
*/
//flink-jdbc-1.11.1 How to write it , All attribute names are in JdbcTableSourceSinkFactory The factory class defines
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
// " 'connector.read.fetch-size' = '10' \n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set up setParallelism Parallelism
env.setParallelism(1);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register mysql Data dimension table
tEnv.executeSql(table_sql);
//Table table = avg(tEnv);
//Table table = count(tEnv);
//Table table = min(tEnv);
Table table = max(tEnv);
// Print the field structure
table.printSchema();
// General query operation toAppendStream
//tEnv.toAppendStream(table, Row.class).print();
//group Operation toRetractStream
//tEnv.toRetractStream(table, Row.class).print();
//table Turn into dataStream flow ,Tuple2 The first parameter flag yes true Express add Add a new record stream ,false Express retract Represents the old record stream
DataStream<Tuple2<Boolean, Row>> behaviorStream = tEnv.toRetractStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception {
if (value.f0) {
System.out.println(value.f1.toString());
}
}
}).print();
env.execute();
}
/**
* avg Get the average of values in a set of data streams
* @param tEnv
* @return
*/
public static Table avg(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,avg(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").avg().as("age1"));
return table;
}
/**
* count Get the sum of the number of rows in a group of data streams
* @param tEnv
* @return
*/
public static Table count(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,count(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").count().as("age1"));
return table;
}
/**
* sum Gets the sum of the number of accumulated packets in a set of data streams
* @param tEnv
* @return
*/
public static Table sum(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,sum(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").sum().as("age1"));
return table;
}
/**
* min Get the minimum value in a set of data streams
* @param tEnv
* @return
*/
public static Table min(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,min(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").min().as("age1"));
return table;
}
/**
* max Get the maximum value in a set of data streams , Every time data is added , Then calculate once
* @param tEnv
* @return
*/
public static Table max(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,max(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").max().as("age1"));
return table;
}
}
Build table SQL
CREATE TABLE `users` (
`id` bigint(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(8) DEFAULT NULL,
`status` tinyint(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4Print the results
root
|-- status: INT
|-- age1: INT
0,16
0,18
1,21
1,28
2,31边栏推荐
- 对于编程思想和能力有重大提升的书有哪些?
- [零基础学IoT Pwn] 复现Netgear WNAP320 RCE
- What are the requirements for NPDP product manager international certification registration?
- Rearrangement of overloaded operators
- MIT team used graph neural network to accelerate the screening of amorphous polymer electrolytes and promote the development of next-generation lithium battery technology
- 购物商城6.27待完成
- [零基础学IoT Pwn] 复现Netgear WNAP320 RCE
- Chapter 4 of getting started with MySQL: creation, modification and deletion of data tables
- Problem note - Oracle 11g uninstall
- TypeScript:var
猜你喜欢

如何实现时钟信号分频?

The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores

【14. 区间和(离散化)】

微信公众号订阅消息 wx-open-subscribe 的实现及闭坑指南

JVM第二话 -- JVM内存模型以及垃圾回收

idea中新建的XML文件变成普通文件的解决方法.

cmake 基本使用过程
![[零基础学IoT Pwn] 复现Netgear WNAP320 RCE](/img/f7/d683df1d4b1b032164a529d3d94615.png)
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE

对于编程思想和能力有重大提升的书有哪些?

Junda technology indoor air environment monitoring terminal PM2.5, temperature and humidity TVOC and other multi parameter monitoring
随机推荐
Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches
Markdown编辑器使用基本语法
Some thoughts on software testing
一波三折,终于找到src漏洞挖掘的方法了【建议收藏】
【14. 区间和(离散化)】
基于价值量化的需求优先级排序方法
三十之前一定要明白的职场潜规则
Mongodb second call -- implementation of mongodb high availability cluster
C learning notes (5) class and inheritance
MIT team used graph neural network to accelerate the screening of amorphous polymer electrolytes and promote the development of next-generation lithium battery technology
深度分析数据在内存中的存储形式
These three online PS tools should be tried
ArrayList 扩容详解,扩容原理[通俗易懂]
Ensure production safety! Guangzhou requires hazardous chemical enterprises to "not produce in an unsafe way, and keep constant communication"
Develop small programs and official account from zero [phase III]
[advanced ROS] lesson 5 TF coordinate transformation in ROS
The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores
Error-tf.function-decorated function tried to create variables on non-first call
En utilisant le paquet npoi de net Core 6 c #, lisez Excel.. Image dans la cellule xlsx et stockée sur le serveur spécifié
Pat 1121 damn single (25 points) set