当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 插入数据
Flink 系例 之 TableAPI & SQL 与 MYSQL 插入数据
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器将数据插入 MYSQL 数据库表
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器将数据插入MYSQL数据库表
*/
public class InsertToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', -- 数据库需建立 \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', -- 已知的表 \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
//执行SQL,id=0是因id字段为自增主键,为0则mysql识别会默认自增值代替
String sql = "insert into my_users (id,name,age,status) values(0,'tom',31,0)";
// 第一种方式:直接执行sql
// TableResult tableResult = tEnv.executeSql(sql);
//第二种方式:声明一个操作集合来执行sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}
打印结果
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set边栏推荐
- Salesforce, Johns Hopkins, Columbia | progen2: exploring the boundaries of protein language models
- The first technology podcast month will be broadcast soon
- Some thoughts on software testing
- TypeScript: let
- leetcode:329. 矩阵中的最长递增路径
- Pat 1065 a+b and C (64bit) (20 points) (16 points)
- [dynamic programming] interval dp:p1005 matrix retrieval
- Problem note - Oracle 11g uninstall
- Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()
- 【阶段人生总结】放弃考研,参与到工作中,已经顺利毕业了,昨天刚领毕业证
猜你喜欢

leetcode:329. Longest increasing path in matrix

MIT team used graph neural network to accelerate the screening of amorphous polymer electrolytes and promote the development of next-generation lithium battery technology

The first technology podcast month will be broadcast soon

竣达技术丨室内空气环境监测终端 pm2.5、温湿度TVOC等多参数监测
![[getting started with Django] 13 page Association MySQL](/img/78/cbf88ae3c3d311edd7d9af8c985749.jpg)
[getting started with Django] 13 page Association MySQL "multi" field table (check)

JVM第二话 -- JVM内存模型以及垃圾回收

写在Doris毕业后的第一天

One of the first steps to redis

Official announcement: Apache Doris graduated successfully and became the top project of ASF!

JVM performance tuning and practical basic theory part II
随机推荐
Written on the first day after Doris graduated
微信网页订阅消息实现
241. 为运算表达式设计优先级
榨汁机UL982测试项目有哪些
Solidty智能合约开发-简易入门
【15. 区间合并】
官宣:Apache Doris 顺利毕业,成为 ASF 顶级项目!
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE
What is the relationship between network speed, broadband, bandwidth and traffic?
深度分析数据在内存中的存储形式
JVM第二话 -- JVM内存模型以及垃圾回收
Blog recommendation | in depth study of message segmentation in pulsar
DirectX repair tool v4.1 public beta! [easy to understand]
leetcode:329. Longest increasing path in matrix
JVM第一话 -- JVM入门详解以及运行时数据区分析
关于软件测试的一些思考
Rearrangement of overloaded operators
Buuctf reinforcement question ezsql
Quelle valeur le pdnp peut - il apporter aux gestionnaires de produits? Vous savez tout?
Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()