当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 插入数据
Flink 系例 之 TableAPI & SQL 与 MYSQL 插入数据
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器将数据插入 MYSQL 数据库表
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器将数据插入MYSQL数据库表
*/
public class InsertToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', -- 数据库需建立 \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', -- 已知的表 \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
//执行SQL,id=0是因id字段为自增主键,为0则mysql识别会默认自增值代替
String sql = "insert into my_users (id,name,age,status) values(0,'tom',31,0)";
// 第一种方式:直接执行sql
// TableResult tableResult = tEnv.executeSql(sql);
//第二种方式:声明一个操作集合来执行sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}
打印结果
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set边栏推荐
- 微信网页订阅消息实现
- The markdown editor uses basic syntax
- [14. Interval sum (discretization)]
- It's suitable for people who don't have eloquence. The benefits of joining the China Video partner program are really delicious. One video gets 3 benefits
- Quelle valeur le pdnp peut - il apporter aux gestionnaires de produits? Vous savez tout?
- [15. Interval consolidation]
- Microservice development steps (Nacos)
- 【14. 区间和(离散化)】
- En utilisant le paquet npoi de net Core 6 c #, lisez Excel.. Image dans la cellule xlsx et stockée sur le serveur spécifié
- [Verilog quick start of Niuke question series] ~ use functions to realize data size conversion
猜你喜欢

微服务追踪SQL(支持Isto管控下的gorm查询追踪)

What problems should be considered for outdoor LED display?

Opencv learning notes 5 -- document scanning +ocr character recognition

写在Doris毕业后的第一天

Markdown编辑器使用基本语法

opencv学习笔记五--文件扫描+OCR文字识别

问题随记 —— Oracle 11g 卸载
![[14. Interval sum (discretization)]](/img/e5/8b29aca7068a6385e8ce90c2742c37.png)
[14. Interval sum (discretization)]

【15. 区间合并】

idea中新建的XML文件变成普通文件的解决方法.
随机推荐
After twists and turns, I finally found the method of SRC vulnerability mining [recommended collection]
2022-2-15 learning xiangniuke project - Section 4 business management
炎炎夏日,这份安全用气指南请街坊们收好!
C learning notes (5) class and inheritance
What value can NPDP bring to product managers? Do you know everything?
生成随机数(4位、6位)
购物商城6.27待完成
竣达技术丨室内空气环境监测终端 pm2.5、温湿度TVOC等多参数监测
QT capture interface is displayed as picture or label
保证生产安全!广州要求危化品企业“不安全不生产、不变通”
Internet hospital system source code hospital applet source code smart hospital source code online consultation system source code
241. 为运算表达式设计优先级
Problem note - Oracle 11g uninstall
Develop small programs and official account from zero [phase III]
Solidty智能合约开发-简易入门
[getting started with Django] 13 page Association MySQL "multi" field table (check)
【阶段人生总结】放弃考研,参与到工作中,已经顺利毕业了,昨天刚领毕业证
Cannot link redis when redis is enabled
Day-02 database
Buuctf reinforcement question ezsql