当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 数据查询
Flink 系例 之 TableAPI & SQL 与 MYSQL 数据查询
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器从 MYSQL 数据库表中 SELECT 选取数据。
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11
示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
SelectToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器从MYSQL数据库表中SELECT选取数据。
*/
public class SelectToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
分区扫描
为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。
scan.partition.column:用于对输入进行分区的列名。
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password', \n" +
" 'connector.read.fetch-size' = '3' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
String sql = "select id,name,age,status from my_users";
//第一种:执行SQL
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").select($("id"), $("name"), $("age"), $("status")).where($("status").isEqual(0));
//打印字段结构
table.printSchema();
//table 转成 dataStream 流
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Row, Object>() {
@Override
public void flatMap(Row value, Collector<Object> out) throws Exception {
System.out.println(value.toString());
Thread.sleep(1 * 1000);
}
}).print();
env.execute();
}
}
打印结果
root
|-- id: BIGINT NOT NULL
|-- name: STRING
|-- age: INT
|-- status: INT
1,nike,16,0
2,nike,18,0
边栏推荐
- Take you to API development by hand
- 问题随记 —— Oracle 11g 卸载
- 数据产品经理需要掌握哪些数据能力?
- Detailed explanation of ArrayList expansion, expansion principle [easy to understand]
- The markdown editor uses basic syntax
- DirectX修复工具V4.1公测![通俗易懂]
- cmake 基本使用过程
- It's suitable for people who don't have eloquence. The benefits of joining the China Video partner program are really delicious. One video gets 3 benefits
- 从零开发小程序和公众号【第三期】
- Demand prioritization method based on value quantification
猜你喜欢
互联网医院系统源码 医院小程序源码 智慧医院源码 在线问诊系统源码
问题随记 —— Oracle 11g 卸载
写在Doris毕业后的第一天
What are the books that have greatly improved the thinking and ability of programming?
Chapter 4 of getting started with MySQL: creation, modification and deletion of data tables
Problem note - Oracle 11g uninstall
The first word of JVM -- detailed introduction to JVM and analysis of runtime data area
Task.Run(), Task.Factory.StartNew() 和 New Task() 的行为不一致分析
音乐播放器开发实例(可毕设)
【LeetCode】16、最接近的三数之和
随机推荐
C learning notes (5) class and inheritance
It's settled! 2022 Hainan secondary cost engineer examination time is determined! The registration channel has been opened!
Generate random numbers (4-bit, 6-bit)
What if you are always bullied because you are too honest in the workplace?
[15. Interval consolidation]
Some thoughts on software testing
这3款在线PS工具,得试试
[Verilog quick start of Niuke question series] ~ use functions to realize data size conversion
Use the npoi package of net core 6 C to read excel Pictures in xlsx cells and stored to the specified server
Don't want to knock the code? Here comes the chance
APK签名原理
How to view the state-owned enterprises have unloaded Microsoft office and switched to Kingsoft WPS?
Storage form of in-depth analysis data in memory
C#学习笔记(5)类和继承
idea中新建的XML文件变成普通文件的解决方法.
What value can NPDP bring to product managers? Do you know everything?
In hot summer, please put away this safe gas use guide!
openssl客户端编程:一个不起眼的函数导致的SSL会话失败问题
Solid basic basic grammar and definition function
一波三折,终于找到src漏洞挖掘的方法了【建议收藏】