当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 数据查询
Flink 系例 之 TableAPI & SQL 与 MYSQL 数据查询
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器从 MYSQL 数据库表中 SELECT 选取数据。
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
SelectToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器从MYSQL数据库表中SELECT选取数据。
*/
public class SelectToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
分区扫描
为了加速并行Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。
scan.partition.column:用于对输入进行分区的列名。
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password', \n" +
" 'connector.read.fetch-size' = '3' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
String sql = "select id,name,age,status from my_users";
//第一种:执行SQL
//Table table = tEnv.sqlQuery(sql);
//第二种:通过方法拼装执行语句
Table table = tEnv.from("my_users").select($("id"), $("name"), $("age"), $("status")).where($("status").isEqual(0));
//打印字段结构
table.printSchema();
//table 转成 dataStream 流
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Row, Object>() {
@Override
public void flatMap(Row value, Collector<Object> out) throws Exception {
System.out.println(value.toString());
Thread.sleep(1 * 1000);
}
}).print();
env.execute();
}
}
打印结果
root
|-- id: BIGINT NOT NULL
|-- name: STRING
|-- age: INT
|-- status: INT
1,nike,16,0
2,nike,18,0边栏推荐
- Guess lantern riddles, not programmers still can't understand?
- The first word of JVM -- detailed introduction to JVM and analysis of runtime data area
- What if you are always bullied because you are too honest in the workplace?
- Basic operation of database
- Configuration of ZABBIX API and PHP
- DirectX repair tool v4.1 public beta! [easy to understand]
- opencv学习笔记五--文件扫描+OCR文字识别
- [零基础学IoT Pwn] 复现Netgear WNAP320 RCE
- What value can NPDP bring to product managers? Do you know everything?
- DirectX修复工具V4.1公测![通俗易懂]
猜你喜欢

竣达技术丨室内空气环境监测终端 pm2.5、温湿度TVOC等多参数监测
![[Verilog quick start of Niuke question series] ~ use functions to realize data size conversion](/img/e1/d35e1d382e0e945849010941b219d3.png)
[Verilog quick start of Niuke question series] ~ use functions to realize data size conversion

One of the first steps to redis

Salesforce, Johns Hopkins, Columbia | progen2: exploring the boundaries of protein language models
![[Verilog quick start of Niuke series] ~ multi function data processor, calculate the difference between two numbers, use generate... For statement to simplify the code, and use sub modules to realize](/img/30/aea4ae24f418eb971bca77a1d46bef.png)
[Verilog quick start of Niuke series] ~ multi function data processor, calculate the difference between two numbers, use generate... For statement to simplify the code, and use sub modules to realize

Problem note - Oracle 11g uninstall

Official announcement: Apache Doris graduated successfully and became the top project of ASF!

【14. 区间和(离散化)】
![[zero basic IOT pwn] reproduce Netgear wnap320 rce](/img/f7/d683df1d4b1b032164a529d3d94615.png)
[zero basic IOT pwn] reproduce Netgear wnap320 rce

Chapter 4 of getting started with MySQL: creation, modification and deletion of data tables
随机推荐
Word2vec yyds dry goods inventory
Error-tf. function-decorated function tried to create variables on non-first call
Solidty智能合约开发-简易入门
手把手带你入门 API 开发
The data in the database table recursively forms a closed-loop data. How can we get these data
写在Doris毕业后的第一天
What if you are always bullied because you are too honest in the workplace?
数字化转型:数据可视化赋能销售管理
定了!2022海南二级造价工程师考试时间确定!报名通道已开启!
ArrayList 扩容详解,扩容原理[通俗易懂]
Quelle valeur le pdnp peut - il apporter aux gestionnaires de produits? Vous savez tout?
竣达技术丨多台精密空调微信云监控方案
The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores
Minimum spanning tree and bipartite graph in graph theory (acwing template)
Problem note - Oracle 11g uninstall
[leetcode 324] 摆动排序 II 思维+排序
竣达技术丨室内空气环境监测终端 pm2.5、温湿度TVOC等多参数监测
DirectX修复工具V4.1公测![通俗易懂]
2022-2-15 learning xiangniuke project - Section 4 business management
Solid smart contract development - easy to get started