当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器读取 MYSQL 数据,并用 GROUP BY 语句根据一个或多个列对结果集进行分组。
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
GroupToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器读取MYSQL数据,并用GROUP BY语句根据一个或多个列对结果集进行分组。
*/
public class GroupToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
分区扫描
为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。
scan.partition.column:用于对输入进行分区的列名。
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
// " 'connector.read.fetch-size' = '10' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置setParallelism并行度
env.setParallelism(1);
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
//Table table = avg(tEnv);
//Table table = count(tEnv);
//Table table = min(tEnv);
Table table = max(tEnv);
//打印字段结构
table.printSchema();
//普通查询操作用toAppendStream
//tEnv.toAppendStream(table, Row.class).print();
//group操作用toRetractStream
//tEnv.toRetractStream(table, Row.class).print();
//table 转成 dataStream 流,Tuple2第一个参数flag是true表示add添加新的记录流,false表示retract表示旧的记录流
DataStream<Tuple2<Boolean, Row>> behaviorStream = tEnv.toRetractStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Tuple2<Boolean, Row>, Object>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Object> out) throws Exception {
if (value.f0) {
System.out.println(value.f1.toString());
}
}
}).print();
env.execute();
}
/**
* avg 获取一组数据流中的数值平均值
* @param tEnv
* @return
*/
public static Table avg(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,avg(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").avg().as("age1"));
return table;
}
/**
* count 获取一组数据流中累加分组的行数之和
* @param tEnv
* @return
*/
public static Table count(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,count(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").count().as("age1"));
return table;
}
/**
* sum 获取一组数据流中累加分组的数值之和
* @param tEnv
* @return
*/
public static Table sum(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,sum(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").sum().as("age1"));
return table;
}
/**
* min 获取一组数据流中的最小值
* @param tEnv
* @return
*/
public static Table min(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,min(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").min().as("age1"));
return table;
}
/**
* max 获取一组数据流中的最大值,每一次数据加入,则计算一次
* @param tEnv
* @return
*/
public static Table max(StreamTableEnvironment tEnv){
//第一种:执行SQL
String sql = "select status,max(age) as age1 from my_users group by status";
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").groupBy($("status")).select($("status"),$("age").max().as("age1"));
return table;
}
}
建表 SQL
CREATE TABLE `users` (
`id` bigint(8) NOT NULL AUTO_INCREMENT,
`name` varchar(40) DEFAULT NULL,
`age` int(8) DEFAULT NULL,
`status` tinyint(2) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4打印结果
root
|-- status: INT
|-- age1: INT
0,16
0,18
1,21
1,28
2,31边栏推荐
- Vnctf2022 open web gocalc0
- Blog recommendation | in depth study of message segmentation in pulsar
- What value can NPDP bring to product managers? Do you know everything?
- 微服务开发步骤(nacos)
- tensorflow2-savedmodel convert to tflite
- 项目中字符串判空总结
- Build your own website (14)
- Opencv interpolation mode
- 竣达技术丨多台精密空调微信云监控方案
- Music player development example (can be set up)
猜你喜欢

JVM second conversation -- JVM memory model and garbage collection

In hot summer, please put away this safe gas use guide!

One of the first steps to redis

首届技术播客月开播在即

Markdown编辑器使用基本语法

The first word of JVM -- detailed introduction to JVM and analysis of runtime data area

【15. 区间合并】

643. Maximum average number of subarrays I

IDEA全局搜索快捷键(ctrl+shift+F)失效修复

JVM performance tuning and practical basic theory part II
随机推荐
JVM第二话 -- JVM内存模型以及垃圾回收
openssl客户端编程:一个不起眼的函数导致的SSL会话失败问题
C#学习笔记(5)类和继承
opencv学习笔记五--文件扫描+OCR文字识别
关于软件测试的一些思考
深度分析数据在内存中的存储形式
MIT团队使用图神经网络,加速无定形聚合物电解质筛选,促进下一代锂电池技术开发
Solid smart contract development - easy to get started
Solid basic structure and array, private / public function, return value and modifier of function, event
What problems should be considered for outdoor LED display?
tensorflow2-savedmodel convert to pb(frozen_graph)
Zabbix API与PHP的配置
Advanced C language
首届技术播客月开播在即
炎炎夏日,这份安全用气指南请街坊们收好!
关于重载运算符的再整理
数据产品经理需要掌握哪些数据能力?
In hot summer, please put away this safe gas use guide!
leetcode:329. Longest increasing path in matrix
Vnctf2022 open web gocalc0