当前位置:网站首页>Tableapi & SQL and MySQL insert data of Flink
Tableapi & SQL and MySQL insert data of Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink JDBC Connector inserts data into MYSQL Database table
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
InsertToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description Use Tbale&SQL And Flink JDBC Connector inserts data into MYSQL Database table
*/
public class InsertToMysql {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
*/
//flink-jdbc-1.11.1 How to write it , All attribute names are in JdbcTableSourceSinkFactory The factory class defines
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', -- The database needs to be established \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', -- Known tables \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register mysql Data dimension table
tEnv.executeSql(table_sql);
// perform SQL,id=0 It's because of id Fields are self incrementing primary keys , by 0 be mysql Recognition will default to self increment instead of
String sql = "insert into my_users (id,name,age,status) values(0,'tom',31,0)";
// The first way : Direct execution sql
// TableResult tableResult = tEnv.executeSql(sql);
// The second way : Declare a collection of operations to perform sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}
Print the results
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set边栏推荐
- JVM second conversation -- JVM memory model and garbage collection
- Reorganize the trivial knowledge points at the end of the term
- 微信公众号订阅消息 wx-open-subscribe 的实现及闭坑指南
- Tensorflow 2. X realizes iris classification
- 【15. 区间合并】
- 项目中字符串判空总结
- Redis installation and setting up SSDB master-slave environment under Ubuntu 14.04
- Junda technology - wechat cloud monitoring scheme for multiple precision air conditioners
- [15. Interval consolidation]
- Fix the failure of idea global search shortcut (ctrl+shift+f)
猜你喜欢

关于重载运算符的再整理

leetcode:329. 矩阵中的最长递增路径
![[15. Interval consolidation]](/img/6c/afc46a0e0d14127d2c234ed9a9d03b.png)
[15. Interval consolidation]

Official announcement: Apache Doris graduated successfully and became the top project of ASF!
![After twists and turns, I finally found the method of SRC vulnerability mining [recommended collection]](/img/ac/ab6053e6ea449beedf434d4cf07dbb.png)
After twists and turns, I finally found the method of SRC vulnerability mining [recommended collection]

Filter &(登录拦截)

skywalking 6.4 分布式链路跟踪 使用笔记

Filter & (login interception)

One of the first steps to redis

The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores
随机推荐
leetcode:329. 矩阵中的最长递增路径
数字化转型:数据可视化赋能销售管理
关于软件测试的一些思考
Introduction to MySQL audit plug-in
Filter &(登录拦截)
竣达技术丨多台精密空调微信云监控方案
After twists and turns, I finally found the method of SRC vulnerability mining [recommended collection]
The first technology podcast month will be broadcast soon
MongoDB第二話 -- MongoDB高可用集群實現
三十之前一定要明白的职场潜规则
Hidden rules of the workplace that must be understood before 30
TypeScript:const
Yyds dry goods inventory hcie security day13: firewall dual machine hot standby experiment (I) firewall direct deployment, uplink and downlink connection switches
关于重载运算符的再整理
IDEA全局搜索快捷键(ctrl+shift+F)失效修复
TS报错 Don‘t use `object` as a type. The `object` type is currently hard to use
Junda technology indoor air environment monitoring terminal PM2.5, temperature and humidity TVOC and other multi parameter monitoring
Day-02 database
Use the npoi package of net core 6 C to read excel Pictures in xlsx cells and stored to the specified server
Reorganize the trivial knowledge points at the end of the term