当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器读取 MYSQL 数据,并用 GROUP BY 语句根据一个或多个列对结果集进行分组。
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11
示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
GroupToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。
*/
public class GroupToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
分区扫描
为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。
scan.partition.column:用于对输入进行分区的列名。
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
// " 'connector.read.fetch-size' = '10' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置setParallelism并行度
env.setParallelism(1);
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
//Table table = avg(tEnv);
//Table table = count(tEnv);
//Table table = min(tEnv);
Table table = max(tEnv);
//打印字段结构
table.printSchema();
//普通查询操作用toAppendStream
//tEnv.toAppendStream(table, Row.class).print();
//group操作用toRetractStream
//tEnv.toRetractStream(table, Row.class).print();
//table 转成 dataStream 流,Tuple2第一个参数flag是true表示add添加新的记录流,false表示retract表示旧的记录流
DataStream<Tuple2<Boolean, Row>> behaviorStream = tEnv.toRetractStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception {
if (value.f0) {
System.out.println(value.f1.toString());
}
}
}).print();
env.execute();
}
/**
* avg 获取一组数据流中的数值平均值
* @param tEnv
* @return
*/
public static Table avg(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,avg(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").avg().as("age1"));
return table;
}
/**
* count 获取一组数据流中累加分组的行数之和
* @param tEnv
* @return
*/
public static Table count(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,count(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").count().as("age1"));
return table;
}
/**
* sum 获取一组数据流中累加分组的数值之和
* @param tEnv
* @return
*/
public static Table sum(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,sum(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").sum().as("age1"));
return table;
}
/**
* min 获取一组数据流中的最小值
* @param tEnv
* @return
*/
public static Table min(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,min(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").min().as("age1"));
return table;
}
/**
* max 获取一组数据流中的最大值,每一次数据加入,则计算一次
* @param tEnv
* @return
*/
public static Table max(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,max(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").max().as("age1"));
return table;
}
}
建表 SQL
CREATE TABLE `users` (
`id` bigint(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(8) DEFAULT NULL,
`status` tinyint(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
打印结果
root
|-- status: INT
|-- age1: INT
0,16
0,18
1,21
1,28
2,31
边栏推荐
- 定了!2022海南二级造价工程师考试时间确定!报名通道已开启!
- QT capture interface is displayed as picture or label
- 购物商城6.27待完成
- Internet hospital system source code hospital applet source code smart hospital source code online consultation system source code
- Use the npoi package of net core 6 C to read excel Pictures in xlsx cells and stored to the specified server
- Music player development example (can be set up)
- Basic operations of SQL database
- 2022-2-15 learning xiangniuke project - Section 4 business management
- C learning notes (5) class and inheritance
- What if you are always bullied because you are too honest in the workplace?
猜你喜欢
一波三折,终于找到src漏洞挖掘的方法了【建议收藏】
[leetcode 324] swing sorting II thinking + sorting
opencv学习笔记六--图像拼接
微服务开发步骤(nacos)
[getting started with Django] 13 page Association MySQL "multi" field table (check)
Guess lantern riddles, not programmers still can't understand?
cmake 基本使用过程
JVM performance tuning and practical basic theory part II
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE
建立自己的网站(14)
随机推荐
首届技术播客月开播在即
对于编程思想和能力有重大提升的书有哪些?
TypeScript:const
竣达技术丨室内空气环境监测终端 pm2.5、温湿度TVOC等多参数监测
APK签名原理
Ubuntu 14.04下搭建MySQL主从服务器
数据产品经理需要掌握哪些数据能力?
[15. Interval consolidation]
Error-tf. function-decorated function tried to create variables on non-first call
These three online PS tools should be tried
tensorflow2-savedmodel convert to tflite
Advanced C language
Salesforce, Johns Hopkins, Columbia | progen2: exploring the boundaries of protein language models
It's suitable for people who don't have eloquence. The benefits of joining the China Video partner program are really delicious. One video gets 3 benefits
[leetcode 324] 摆动排序 II 思维+排序
QT capture interface is displayed as picture or label
Rearrangement of overloaded operators
How to view the state-owned enterprises have unloaded Microsoft office and switched to Kingsoft WPS?
Vnctf2022 open web gocalc0
One of the first steps to redis