当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 数据查询
Flink 系例 之 TableAPI & SQL 与 MYSQL 数据查询
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器从 MYSQL 数据库表中 SELECT 选取数据。
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
SelectToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器从MYSQL数据库表中SELECT选取数据。
*/
public class SelectToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
分区扫描
为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。
scan.partition.column:用于对输入进行分区的列名。
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password', \n" +
" 'connector.read.fetch-size' = '3' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
String sql = "select id,name,age,status from my_users";
//第一种:执行SQL
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").select($("id"), $("name"), $("age"), $("status")).where($("status").isEqual(0));
//打印字段结构
table.printSchema();
//table 转成 dataStream 流
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Row, Object>() {
@Override
public void flatMap(Row value, Collector<Object> out) throws Exception {
System.out.println(value.toString());
Thread.sleep(1 * 1000);
}
}).print();
env.execute();
}
}
打印结果
root
|-- id: BIGINT NOT NULL
|-- name: STRING
|-- age: INT
|-- status: INT
1,nike,16,0
2,nike,18,0边栏推荐
猜你喜欢

Don't want to knock the code? Here comes the chance

Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches

Microservice development steps (Nacos)
![[leetcode 324] 摆动排序 II 思维+排序](/img/cb/26d89e1a1f548b75a5ef9f29eebeee.png)
[leetcode 324] 摆动排序 II 思维+排序

It's settled! 2022 Hainan secondary cost engineer examination time is determined! The registration channel has been opened!

JVM performance tuning and practical basic theory part II

Salesforce、约翰霍普金斯、哥大 | ProGen2: 探索蛋白语言模型的边界

Problem note - Oracle 11g uninstall

JVM performance tuning and practical basic theory part II
![[15. Interval consolidation]](/img/6c/afc46a0e0d14127d2c234ed9a9d03b.png)
[15. Interval consolidation]
随机推荐
Apk signature principle
Guess lantern riddles, not programmers still can't understand?
Zabbix API与PHP的配置
Take you to API development by hand
Vnctf2022 open web gocalc0
Demand prioritization method based on value quantification
Ubuntu 14.04下搭建MySQL主从服务器
网速、宽带、带宽、流量三者之间的关系是什么?
定了!2022海南二级造价工程师考试时间确定!报名通道已开启!
Opencv mat class
基于价值量化的需求优先级排序方法
微服务追踪SQL(支持Isto管控下的gorm查询追踪)
openssl客户端编程:一个不起眼的函数导致的SSL会话失败问题
[dynamic programming] interval dp:p1005 matrix retrieval
APK签名原理
[leetcode 324] swing sorting II thinking + sorting
IDEA全局搜索快捷键(ctrl+shift+F)失效修复
对于编程思想和能力有重大提升的书有哪些?
opencv学习笔记五--文件扫描+OCR文字识别
DirectX repair tool v4.1 public beta! [easy to understand]