当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器读取 MYSQL 数据,并用 GROUP BY 语句根据一个或多个列对结果集进行分组。
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
GroupToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。
*/
public class GroupToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
分区扫描
为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。
scan.partition.column:用于对输入进行分区的列名。
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
// " 'connector.read.fetch-size' = '10' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置setParallelism并行度
env.setParallelism(1);
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
//Table table = avg(tEnv);
//Table table = count(tEnv);
//Table table = min(tEnv);
Table table = max(tEnv);
//打印字段结构
table.printSchema();
//普通查询操作用toAppendStream
//tEnv.toAppendStream(table, Row.class).print();
//group操作用toRetractStream
//tEnv.toRetractStream(table, Row.class).print();
//table 转成 dataStream 流,Tuple2第一个参数flag是true表示add添加新的记录流,false表示retract表示旧的记录流
DataStream<Tuple2<Boolean, Row>> behaviorStream = tEnv.toRetractStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception {
if (value.f0) {
System.out.println(value.f1.toString());
}
}
}).print();
env.execute();
}
/**
* avg 获取一组数据流中的数值平均值
* @param tEnv
* @return
*/
public static Table avg(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,avg(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").avg().as("age1"));
return table;
}
/**
* count 获取一组数据流中累加分组的行数之和
* @param tEnv
* @return
*/
public static Table count(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,count(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").count().as("age1"));
return table;
}
/**
* sum 获取一组数据流中累加分组的数值之和
* @param tEnv
* @return
*/
public static Table sum(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,sum(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").sum().as("age1"));
return table;
}
/**
* min 获取一组数据流中的最小值
* @param tEnv
* @return
*/
public static Table min(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,min(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").min().as("age1"));
return table;
}
/**
* max 获取一组数据流中的最大值,每一次数据加入,则计算一次
* @param tEnv
* @return
*/
public static Table max(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,max(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").max().as("age1"));
return table;
}
}
建表 SQL
CREATE TABLE `users` (
`id` bigint(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(8) DEFAULT NULL,
`status` tinyint(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4打印结果
root
|-- status: INT
|-- age1: INT
0,16
0,18
1,21
1,28
2,31边栏推荐
- 微服务追踪SQL(支持Isto管控下的gorm查询追踪)
- JVM performance tuning and practical basic theory part II
- leetcode:329. Longest increasing path in matrix
- Opencv interpolation mode
- Error-tf. function-decorated function tried to create variables on non-first call
- After twists and turns, I finally found the method of SRC vulnerability mining [recommended collection]
- 期末琐碎知识点再整理
- The markdown editor uses basic syntax
- It's settled! 2022 Hainan secondary cost engineer examination time is determined! The registration channel has been opened!
- opencv学习笔记六--图像拼接
猜你喜欢

Basic operations of SQL database

Don't want to knock the code? Here comes the chance

JVM第二话 -- JVM内存模型以及垃圾回收

Markdown编辑器使用基本语法

What are the books that have greatly improved the thinking and ability of programming?

111. Minimum depth of binary tree

Salesforce、约翰霍普金斯、哥大 | ProGen2: 探索蛋白语言模型的边界

JVM second conversation -- JVM memory model and garbage collection

The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores

One of the first steps to redis
随机推荐
Advanced C language
En utilisant le paquet npoi de net Core 6 c #, lisez Excel.. Image dans la cellule xlsx et stockée sur le serveur spécifié
【锁】Redis锁 处理并发 原子性
保证生产安全!广州要求危化品企业“不安全不生产、不变通”
Basic operations of SQL database
[dynamic programming] p1004 grid access (four-dimensional DP template question)
Quelle valeur le pdnp peut - il apporter aux gestionnaires de produits? Vous savez tout?
使用net core 6 c# 的 NPOI 包,读取excel..xlsx单元格内的图片,并存储到指定服务器
Ubuntu 14.04下搭建MySQL主从服务器
241. 为运算表达式设计优先级
qt捕获界面为图片或label显示
IDEA全局搜索快捷键(ctrl+shift+F)失效修复
微服务开发步骤(nacos)
Sqlachemy common operations
[stage life summary] I gave up the postgraduate entrance examination and participated in the work. I have successfully graduated and just received my graduation certificate yesterday
一波三折,终于找到src漏洞挖掘的方法了【建议收藏】
Day-02 database
【LeetCode】16、最接近的三数之和
购物商城6.27待完成
solidty-基础篇-基础语法和定义函数