当前位置:网站首页>Tableapi & SQL and MySQL insert data of Flink
Tableapi & SQL and MySQL insert data of Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink JDBC Connector inserts data into MYSQL Database table
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11
Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
InsertToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description Use Tbale&SQL And Flink JDBC Connector inserts data into MYSQL Database table
*/
public class InsertToMysql {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
*/
//flink-jdbc-1.11.1 How to write it , All attribute names are in JdbcTableSourceSinkFactory The factory class defines
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', -- The database needs to be established \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', -- Known tables \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register mysql Data dimension table
tEnv.executeSql(table_sql);
// perform SQL,id=0 It's because of id Fields are self incrementing primary keys , by 0 be mysql Recognition will default to self increment instead of
String sql = "insert into my_users (id,name,age,status) values(0,'tom',31,0)";
// The first way : Direct execution sql
// TableResult tableResult = tEnv.executeSql(sql);
// The second way : Declare a collection of operations to perform sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}
Print the results
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set
边栏推荐
- These three online PS tools should be tried
- 智能运维实战:银行业务流程及单笔交易追踪
- Basic operations of SQL database
- opencv学习笔记四--银行卡号识别
- Take you to API development by hand
- Detailed explanation of ArrayList expansion, expansion principle [easy to understand]
- [zero basic IOT pwn] reproduce Netgear wnap320 rce
- Ubuntu 14.04下搭建MySQL主从服务器
- 三十之前一定要明白的职场潜规则
- Mongodb second talk - - mongodb High available Cluster Implementation
猜你喜欢
Introduction to MySQL audit plug-in
微信网页订阅消息实现
opencv学习笔记五--文件扫描+OCR文字识别
Chapter 4 of getting started with MySQL: creation, modification and deletion of data tables
opencv学习笔记六--图像拼接
Opencv Learning Notes 6 -- image mosaic
Markdown编辑器使用基本语法
MIT team used graph neural network to accelerate the screening of amorphous polymer electrolytes and promote the development of next-generation lithium battery technology
如何实现时钟信号分频?
leetcode:329. Longest increasing path in matrix
随机推荐
Semiconductor foundation of binary realization principle
Flink 系例 之 TableAPI & SQL 与 MYSQL 数据查询
数字化转型:数据可视化赋能销售管理
深度分析数据在内存中的存储形式
写在Doris毕业后的第一天
TypeScript:const
MySQL审计插件介绍
Solidty智能合约开发-简易入门
【天线】【3】CST一些快捷键
Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()
Solid basic basic grammar and definition function
微信网页订阅消息实现
One of the first steps to redis
官宣:Apache Doris 顺利毕业,成为 ASF 顶级项目!
一波三折,终于找到src漏洞挖掘的方法了【建议收藏】
【14. 区间和(离散化)】
Pat 1121 damn single (25 points) set
[lock] redis lock handles concurrency atomicity
网速、宽带、带宽、流量三者之间的关系是什么?
Detailed explanation of ArrayList expansion, expansion principle [easy to understand]