当前位置:网站首页>Tableapi & SQL and MySQL data query of Flink
Tableapi & SQL and MySQL data query of Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink JDBC Connector from MYSQL Database table SELECT Select data .
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
SelectToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.$;
/**
* @Description Use Tbale&SQL And Flink JDBC Connector from MYSQL Database table SELECT Select data .
*/
public class SelectToMysql {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
Partition scan
To speed up parallelism Source Data reading in task instance ,Flink by JDBC Table provides partition scanning function .
scan.partition.column: The column name used to partition the input .
scan.partition.num: Partition number .
scan.partition.lower-bound: The minimum value of the first partition .
scan.partition.upper-bound: The maximum value of the last partition .
*/
//flink-jdbc-1.11.1 How to write it , All attribute names are in JdbcTableSourceSinkFactory The factory class defines
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password', \n" +
" 'connector.read.fetch-size' = '3' \n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register mysql Data dimension table
tEnv.executeSql(table_sql);
String sql = "select id,name,age,status from my_users";
// The first one is : perform SQL
//Table table = tEnv.sqlQuery(sql);
// The second kind : Execution statements are assembled through methods
Table table = tEnv.from("my_users").select($("id"), $("name"), $("age"), $("status")).where($("status").isEqual(0));
// Print the field structure
table.printSchema();
//table Turn into dataStream flow
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.flatMap(new FlatMapFunction<Row, Object>() {
@Override
public void flatMap(Row value, Collector<Object> out) throws Exception {
System.out.println(value.toString());
Thread.sleep(1 * 1000);
}
}).print();
env.execute();
}
}
Print the results
root
|-- id: BIGINT NOT NULL
|-- name: STRING
|-- age: INT
|-- status: INT
1,nike,16,0
2,nike,18,0边栏推荐
- C learning notes (5) class and inheritance
- Opencv Learning Notes 6 -- image mosaic
- [leetcode 324] 摆动排序 II 思维+排序
- 微服务追踪SQL(支持Isto管控下的gorm查询追踪)
- Mongodb second call -- implementation of mongodb high availability cluster
- TypeScript: let
- tensorflow2-savedmodel convert to pb(frozen_graph)
- Generate random numbers (4-bit, 6-bit)
- Ensure production safety! Guangzhou requires hazardous chemical enterprises to "not produce in an unsafe way, and keep constant communication"
- Pat 1121 damn single (25 points) set
猜你喜欢

Microservice development steps (Nacos)

Chapter 4 of getting started with MySQL: creation, modification and deletion of data tables

opencv学习笔记五--文件扫描+OCR文字识别

JVM第二话 -- JVM内存模型以及垃圾回收

The solution to turn the newly created XML file into a common file in idea

idea中新建的XML文件变成普通文件的解决方法.

Junda technology indoor air environment monitoring terminal PM2.5, temperature and humidity TVOC and other multi parameter monitoring

The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores

Build your own website (14)

JS中箭头函数和普通函数的区别
随机推荐
炎炎夏日,这份安全用气指南请街坊们收好!
The first technology podcast month will be broadcast soon
购物商城6.27待完成
C learning notes (5) class and inheritance
Opencv learning note 4 -- bank card number recognition
Pat 1121 damn single (25 points) set
JVM performance tuning and practical basic theory part II
leetcode:329. Longest increasing path in matrix
Flink 系例 之 TableAPI & SQL 与 MYSQL 分组统计
One of the first steps to redis
opencv学习笔记六--图像特征[harris+SIFT]+特征匹配
TS报错 Don‘t use `object` as a type. The `object` type is currently hard to use
Microservice development steps (Nacos)
手把手带你入门 API 开发
数字化转型:数据可视化赋能销售管理
Shopping mall 6.27 to be completed
JVM第二话 -- JVM内存模型以及垃圾回收
Filter &(登录拦截)
【15. 区间合并】
MySQL审计插件介绍