当前位置:网站首页>Flink 系例 之 TableAPI & SQL 与 MYSQL 插入数据
Flink 系例 之 TableAPI & SQL 与 MYSQL 插入数据
2022-07-01 14:54:00 【不会飞的小龙人】
使用 Tbale&SQL 与 Flink JDBC 连接器将数据插入 MYSQL 数据库表
示例环境
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11示例数据源 (项目码云下载)
示例模块 (pom.xml)
Flink 系例 之 TableAPI & SQL 与 示例模块
InsertToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description 使用Tbale&SQL与Flink JDBC连接器将数据插入MYSQL数据库表
*/
public class InsertToMysql {
/**
官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
*/
//flink-jdbc-1.11.1写法,所有属性名在JdbcTableSourceSinkFactory工厂类中定义
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', -- 数据库需建立 \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', -- 已知的表 \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册mysql数据维表
tEnv.executeSql(table_sql);
//执行SQL,id=0是因id字段为自增主键,为0则mysql识别会默认自增值代替
String sql = "insert into my_users (id,name,age,status) values(0,'tom',31,0)";
// 第一种方式:直接执行sql
// TableResult tableResult = tEnv.executeSql(sql);
//第二种方式:声明一个操作集合来执行sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}
打印结果
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set边栏推荐
- What are the books that have greatly improved the thinking and ability of programming?
- idea中新建的XML文件变成普通文件的解决方法.
- 生成随机数(4位、6位)
- 基于价值量化的需求优先级排序方法
- Music player development example (can be set up)
- Basic operation of database
- The markdown editor uses basic syntax
- 微信网页订阅消息实现
- TypeScript: let
- [dynamic programming] interval dp:p1005 matrix retrieval
猜你喜欢

JVM第二话 -- JVM内存模型以及垃圾回收

首届技术播客月开播在即

The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores

音乐播放器开发实例(可毕设)

互联网医院系统源码 医院小程序源码 智慧医院源码 在线问诊系统源码

Task. Run(), Task. Factory. Analysis of behavior inconsistency between startnew() and new task()
![[getting started with Django] 13 page Association MySQL](/img/78/cbf88ae3c3d311edd7d9af8c985749.jpg)
[getting started with Django] 13 page Association MySQL "multi" field table (check)

openssl客户端编程:一个不起眼的函数导致的SSL会话失败问题

Music player development example (can be set up)

微服务追踪SQL(支持Isto管控下的gorm查询追踪)
随机推荐
Demand prioritization method based on value quantification
问题随记 —— Oracle 11g 卸载
cmake 基本使用过程
Hidden rules of the workplace that must be understood before 30
微服务开发步骤(nacos)
定了!2022海南二级造价工程师考试时间确定!报名通道已开启!
购物商城6.27待完成
241. Design priorities for operational expressions
leetcode:329. Longest increasing path in matrix
对于编程思想和能力有重大提升的书有哪些?
These three online PS tools should be tried
opencv学习笔记五--文件扫描+OCR文字识别
TypeScript:const
APK签名原理
DirectX repair tool v4.1 public beta! [easy to understand]
It's settled! 2022 Hainan secondary cost engineer examination time is determined! The registration channel has been opened!
What if you are always bullied because you are too honest in the workplace?
What value can NPDP bring to product managers? Do you know everything?
IDEA全局搜索快捷键(ctrl+shift+F)失效修复
MongoDB第二话 -- MongoDB高可用集群实现