当前位置:网站首页>Tableapi & SQL and MySQL grouping statistics of Flink
Tableapi & SQL and MySQL grouping statistics of Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink JDBC Connector read MYSQL data , And use GROUP BY Statement to group the result set according to one or more columns .
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
GroupToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description Use Tbale&SQL And Flink JDBC Connector read MYSQL data , And use GROUP BY Statement to group the result set according to one or more columns .
*/
public class GroupToMysql {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
Partition scan
To speed up parallelism Source Data reading in task instance ,Flink by JDBC Table provides partition scanning function .
scan.partition.column: The column name used to partition the input .
scan.partition.num: Partition number .
scan.partition.lower-bound: The minimum value of the first partition .
scan.partition.upper-bound: The maximum value of the last partition .
*/
//flink-jdbc-1.11.1 How to write it , All attribute names are in JdbcTableSourceSinkFactory The factory class defines
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
// " 'connector.read.fetch-size' = '10' \n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set up setParallelism Parallelism
env.setParallelism(1);
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register mysql Data dimension table
tEnv.executeSql(table_sql);
//Table table = avg(tEnv);
//Table table = count(tEnv);
//Table table = min(tEnv);
Table table = max(tEnv);
// Print the field structure
table.printSchema();
// General query operation toAppendStream
//tEnv.toAppendStream(table, Row.class).print();
//group Operation toRetractStream
//tEnv.toRetractStream(table, Row.class).print();
//table Turn into dataStream flow ,Tuple2 The first parameter flag yes true Express add Add a new record stream ,false Express retract Represents the old record stream
DataStream<Tuple2<Boolean, Row>> behaviorStream = tEnv.toRetractStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception {
if (value.f0) {
System.out.println(value.f1.toString());
}
}
}).print();
env.execute();
}
/**
* avg Get the average of values in a set of data streams
* @param tEnv
* @return
*/
public static Table avg(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,avg(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").avg().as("age1"));
return table;
}
/**
* count Get the sum of the number of rows in a group of data streams
* @param tEnv
* @return
*/
public static Table count(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,count(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").count().as("age1"));
return table;
}
/**
* sum Gets the sum of the number of accumulated packets in a set of data streams
* @param tEnv
* @return
*/
public static Table sum(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,sum(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").sum().as("age1"));
return table;
}
/**
* min Get the minimum value in a set of data streams
* @param tEnv
* @return
*/
public static Table min(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,min(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").min().as("age1"));
return table;
}
/**
* max Get the maximum value in a set of data streams , Every time data is added , Then calculate once
* @param tEnv
* @return
*/
public static Table max(StreamTableEnvironment tEnv){
// The first one is : perform SQL
String sql = "select status,max(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").max().as("age1"));
return table;
}
}
Build table SQL
CREATE TABLE `users` (
`id` bigint(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(8) DEFAULT NULL,
`status` tinyint(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4Print the results
root
|-- status: INT
|-- age1: INT
0,16
0,18
1,21
1,28
2,31边栏推荐
- What are the books that have greatly improved the thinking and ability of programming?
- DirectX repair tool v4.1 public beta! [easy to understand]
- 购物商城6.27待完成
- Introduction to MySQL audit plug-in
- 竣达技术丨多台精密空调微信云监控方案
- [零基础学IoT Pwn] 复现Netgear WNAP320 RCE
- MySQL 服务正在启动 MySQL 服务无法启动解决途径
- IDEA全局搜索快捷键(ctrl+shift+F)失效修复
- opencv学习笔记六--图像拼接
- [zero basic IOT pwn] reproduce Netgear wnap320 rce
猜你喜欢

Introduction to MySQL audit plug-in

MySQL审计插件介绍

cmake 基本使用过程

写在Doris毕业后的第一天

Markdown编辑器使用基本语法

JVM performance tuning and practical basic theory part II

Salesforce, Johns Hopkins, Columbia | progen2: exploring the boundaries of protein language models

Fix the failure of idea global search shortcut (ctrl+shift+f)

对于编程思想和能力有重大提升的书有哪些?

Cannot link redis when redis is enabled
随机推荐
Beilianzhuguan joined the dragon lizard community to jointly promote carbon neutralization
使用net core 6 c# 的 NPOI 包,读取excel..xlsx单元格内的图片,并存储到指定服务器
solidty-基础篇-基础语法和定义函数
微服务追踪SQL(支持Isto管控下的gorm查询追踪)
JS中箭头函数和普通函数的区别
It's settled! 2022 Hainan secondary cost engineer examination time is determined! The registration channel has been opened!
openssl客户端编程:一个不起眼的函数导致的SSL会话失败问题
榨汁机UL982测试项目有哪些
Solid smart contract development - easy to get started
常见健身器材EN ISO 20957认证标准有哪些
[zero basic IOT pwn] reproduce Netgear wnap320 rce
保证生产安全!广州要求危化品企业“不安全不生产、不变通”
It's suitable for people who don't have eloquence. The benefits of joining the China Video partner program are really delicious. One video gets 3 benefits
Music player development example (can be set up)
The solution to turn the newly created XML file into a common file in idea
TypeScript:var
solidty-基础篇-结构体和数组,私有 / 公共函数,函数的返回值和修饰符,事件
Mongodb second talk - - mongodb High available Cluster Implementation
数据产品经理需要掌握哪些数据能力?
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE