当前位置:网站首页>Tableapi & SQL and MySQL data query of Flink
Tableapi & SQL and MySQL data query of Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink JDBC Connector from MYSQL Database table SELECT Select data .
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
SelectToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description Use Tbale&SQL And Flink JDBC Connector from MYSQL Database table SELECT Select data .
*/
public class SelectToMysql {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
Partition scan
To speed up parallelism Source Data reading in task instance ,Flink by JDBC Table provides partition scanning function .
scan.partition.column: The column name used to partition the input .
scan.partition.num: Partition number .
scan.partition.lower-bound: The minimum value of the first partition .
scan.partition.upper-bound: The maximum value of the last partition .
*/
//flink-jdbc-1.11.1 How to write it , All attribute names are in JdbcTableSourceSinkFactory The factory class defines
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password', \n" +
" 'connector.read.fetch-size' = '3' \n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register mysql Data dimension table
tEnv.executeSql(table_sql);
String sql = "select id,name,age,status from my_users";
// The first one is : perform SQL
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").select($("id"), $("name"), $("age"), $("status")).where($("status").isEqual(0));
// Print the field structure
table.printSchema();
//table Turn into dataStream flow
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Row, Object>() {
@Override
public void flatMap(Row value, Collector<Object> out) throws Exception {
System.out.println(value.toString());
Thread.sleep(1 * 1000);
}
}).print();
env.execute();
}
}
Print the results
root
|-- id: BIGINT NOT NULL
|-- name: STRING
|-- age: INT
|-- status: INT
1,nike,16,0
2,nike,18,0边栏推荐
- Filter &(登录拦截)
- 数据产品经理需要掌握哪些数据能力?
- 建立自己的网站(14)
- The first technology podcast month will be broadcast soon
- Filter & (login interception)
- Day-02 database
- 对于编程思想和能力有重大提升的书有哪些?
- Salesforce, Johns Hopkins, Columbia | progen2: exploring the boundaries of protein language models
- Detailed explanation of ArrayList expansion, expansion principle [easy to understand]
- Redis安装及Ubuntu 14.04下搭建ssdb主从环境
猜你喜欢

Problem note - Oracle 11g uninstall

炎炎夏日,这份安全用气指南请街坊们收好!
![[leetcode 324] 摆动排序 II 思维+排序](/img/cb/26d89e1a1f548b75a5ef9f29eebeee.png)
[leetcode 324] 摆动排序 II 思维+排序

Opencv Learning Notes 6 -- image mosaic

手把手带你入门 API 开发

Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()

【15. 区间合并】

互联网医院系统源码 医院小程序源码 智慧医院源码 在线问诊系统源码

skywalking 6.4 分布式链路跟踪 使用笔记

【LeetCode】16、最接近的三数之和
随机推荐
Ensure production safety! Guangzhou requires hazardous chemical enterprises to "not produce in an unsafe way, and keep constant communication"
Generate random numbers (4-bit, 6-bit)
These three online PS tools should be tried
[zero basic IOT pwn] reproduce Netgear wnap320 rce
Problem note - Oracle 11g uninstall
Ubuntu 14.04下搭建MySQL主从服务器
智能运维实战:银行业务流程及单笔交易追踪
Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches
Zabbix API与PHP的配置
Markdown编辑器使用基本语法
网速、宽带、带宽、流量三者之间的关系是什么?
Hidden rules of the workplace that must be understood before 30
The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores
What data capabilities do data product managers need to master?
Tensorflow 2. X realizes iris classification
微信网页订阅消息实现
The markdown editor uses basic syntax
Mongodb second talk - - mongodb High available Cluster Implementation
数据产品经理需要掌握哪些数据能力?
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE