当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器读取 MYSQL 数据,并用 GROUP BY 语句根据一个或多个列对结果集进行分组。
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
GroupToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。
*/
public class GroupToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
分区扫描
为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。
scan.partition.column:用于对输入进行分区的列名。
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
// " 'connector.read.fetch-size' = '10' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置setParallelism并行度
env.setParallelism(1);
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
//Table table = avg(tEnv);
//Table table = count(tEnv);
//Table table = min(tEnv);
Table table = max(tEnv);
//打印字段结构
table.printSchema();
//普通查询操作用toAppendStream
//tEnv.toAppendStream(table, Row.class).print();
//group操作用toRetractStream
//tEnv.toRetractStream(table, Row.class).print();
//table 转成 dataStream 流,Tuple2第一个参数flag是true表示add添加新的记录流,false表示retract表示旧的记录流
DataStream<Tuple2<Boolean, Row>> behaviorStream = tEnv.toRetractStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception {
if (value.f0) {
System.out.println(value.f1.toString());
}
}
}).print();
env.execute();
}
/**
* avg 获取一组数据流中的数值平均值
* @param tEnv
* @return
*/
public static Table avg(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,avg(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").avg().as("age1"));
return table;
}
/**
* count 获取一组数据流中累加分组的行数之和
* @param tEnv
* @return
*/
public static Table count(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,count(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").count().as("age1"));
return table;
}
/**
* sum 获取一组数据流中累加分组的数值之和
* @param tEnv
* @return
*/
public static Table sum(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,sum(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").sum().as("age1"));
return table;
}
/**
* min 获取一组数据流中的最小值
* @param tEnv
* @return
*/
public static Table min(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,min(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").min().as("age1"));
return table;
}
/**
* max 获取一组数据流中的最大值,每一次数据加入,则计算一次
* @param tEnv
* @return
*/
public static Table max(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,max(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").max().as("age1"));
return table;
}
}
建表 SQL
CREATE TABLE `users` (
`id` bigint(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(8) DEFAULT NULL,
`status` tinyint(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4打印结果
root
|-- status: INT
|-- age1: INT
0,16
0,18
1,21
1,28
2,31边栏推荐
- 【LeetCode】16、最接近的三数之和
- What data capabilities do data product managers need to master?
- Chapter 4 of getting started with MySQL: creation, modification and deletion of data tables
- Redis安装及Ubuntu 14.04下搭建ssdb主从环境
- JVM第二话 -- JVM内存模型以及垃圾回收
- Digital transformation: data visualization enables sales management
- Zabbix API与PHP的配置
- 定了!2022海南二级造价工程师考试时间确定!报名通道已开启!
- 使用net core 6 c# 的 NPOI 包,读取excel..xlsx单元格内的图片,并存储到指定服务器
- Solid smart contract development - easy to get started
猜你喜欢

It's settled! 2022 Hainan secondary cost engineer examination time is determined! The registration channel has been opened!

Take you to API development by hand

问题随记 —— Oracle 11g 卸载

111. Minimum depth of binary tree
![[zero basic IOT pwn] reproduce Netgear wnap320 rce](/img/f7/d683df1d4b1b032164a529d3d94615.png)
[zero basic IOT pwn] reproduce Netgear wnap320 rce

Cannot link redis when redis is enabled

JVM performance tuning and practical basic theory part II
![[Verilog quick start of Niuke question series] ~ use functions to realize data size conversion](/img/e1/d35e1d382e0e945849010941b219d3.png)
[Verilog quick start of Niuke question series] ~ use functions to realize data size conversion

【14. 区间和(离散化)】

【LeetCode】16、最接近的三数之和
随机推荐
2022-2-15 learning xiangniuke project - Section 4 business management
QT capture interface is displayed as picture or label
Hidden rules of the workplace that must be understood before 30
The first word of JVM -- detailed introduction to JVM and analysis of runtime data area
MongoDB第二話 -- MongoDB高可用集群實現
TypeScript:const
JVM performance tuning and practical basic theory part II
Don't want to knock the code? Here comes the chance
定了!2022海南二级造价工程师考试时间确定!报名通道已开启!
Demand prioritization method based on value quantification
It's suitable for people who don't have eloquence. The benefits of joining the China Video partner program are really delicious. One video gets 3 benefits
[14. Interval sum (discretization)]
Quelle valeur le pdnp peut - il apporter aux gestionnaires de produits? Vous savez tout?
Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches
cmake 基本使用过程
Opencv mat class
Redis installation and setting up SSDB master-slave environment under Ubuntu 14.04
Music player development example (can be set up)
Guess lantern riddles, not programmers still can't understand?
Generate random numbers (4-bit, 6-bit)