当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 插入数据
Flink 系例 之 TableAPI & SQL 与 MYSQL 插入数据
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器将数据插入 MYSQL 数据库表
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器将数据插入MYSQL数据库表
*/
public class InsertToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', -- 数据库需建立 \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', -- 已知的表 \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
//执行SQL,id=0是因id字段为自增主键,为0则mysql识别会默认自增值代替
String sql = "insert into my_users (id,name,age,status) values(0,'tom',31,0)";
// 第一种方式:直接执行sql
// TableResult tableResult = tEnv.executeSql(sql);
//第二种方式:声明一个操作集合来执行sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}
打印结果
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set边栏推荐
- Vnctf2022 open web gocalc0
- 一波三折,终于找到src漏洞挖掘的方法了【建议收藏】
- solidty-基础篇-结构体和数组,私有 / 公共函数,函数的返回值和修饰符,事件
- [Verilog quick start of Niuke question series] ~ use functions to realize data size conversion
- 定了!2022海南二级造价工程师考试时间确定!报名通道已开启!
- 微信网页订阅消息实现
- 对于编程思想和能力有重大提升的书有哪些?
- 写在Doris毕业后的第一天
- 【15. 区间合并】
- The first word of JVM -- detailed introduction to JVM and analysis of runtime data area
猜你喜欢

【15. 区间合并】

Chapter 4 of getting started with MySQL: creation, modification and deletion of data tables

How to view the state-owned enterprises have unloaded Microsoft office and switched to Kingsoft WPS?

C learning notes (5) class and inheritance

Basic operations of SQL database

opencv学习笔记六--图像拼接
![[零基础学IoT Pwn] 复现Netgear WNAP320 RCE](/img/f7/d683df1d4b1b032164a529d3d94615.png)
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE
![[Verilog quick start of Niuke series] ~ multi function data processor, calculate the difference between two numbers, use generate... For statement to simplify the code, and use sub modules to realize](/img/30/aea4ae24f418eb971bca77a1d46bef.png)
[Verilog quick start of Niuke series] ~ multi function data processor, calculate the difference between two numbers, use generate... For statement to simplify the code, and use sub modules to realize

It's settled! 2022 Hainan secondary cost engineer examination time is determined! The registration channel has been opened!

微服务追踪SQL(支持Isto管控下的gorm查询追踪)
随机推荐
Error-tf. function-decorated function tried to create variables on non-first call
[零基础学IoT Pwn] 复现Netgear WNAP320 RCE
2022-2-15 learning xiangniuke project - Section 4 business management
qt捕获界面为图片或label显示
cmake 基本使用过程
榨汁机UL982测试项目有哪些
[14. Interval sum (discretization)]
微服务追踪SQL(支持Isto管控下的gorm查询追踪)
购物商城6.27待完成
定了!2022海南二级造价工程师考试时间确定!报名通道已开启!
项目中字符串判空总结
Opencv learning notes 5 -- document scanning +ocr character recognition
一波三折,终于找到src漏洞挖掘的方法了【建议收藏】
Basic operation of database
网速、宽带、带宽、流量三者之间的关系是什么?
What problems should be considered for outdoor LED display?
Summary of empty string judgment in the project
写在Doris毕业后的第一天
241. Design priorities for operational expressions
It's suitable for people who don't have eloquence. The benefits of joining the China Video partner program are really delicious. One video gets 3 benefits