当前位置:网站首页>Tableapi & SQL and MySQL insert data of Flink
Tableapi & SQL and MySQL insert data of Flink
2022-07-01 15:01:00 【Dragon man who can't fly】
Use Tbale&SQL And Flink JDBC Connector inserts data into MYSQL Database table
Sample environment
java.version: 1.8.x
flink.version: 1.11.1
kafka:2.11
Sample data source ( Project code cloud download )
Flink System examples And Build development environment and data
Sample module (pom.xml)
Flink System examples And TableAPI & SQL And Sample module
InsertToMysql.java
package com.flink.examples.mysql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Description Use Tbale&SQL And Flink JDBC Connector inserts data into MYSQL Database table
*/
public class InsertToMysql {
/**
Official reference :https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
*/
//flink-jdbc-1.11.1 How to write it , All attribute names are in JdbcTableSourceSinkFactory The factory class defines
static String table_sql =
"CREATE TABLE my_users (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" age INT,\n" +
" status INT,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector.type' = 'jdbc',\n" +
" 'connector.url' = 'jdbc:mysql://192.168.110.35:3306/flink?useUnicode=true&characterEncoding=utf-8', -- The database needs to be established \n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', \n" +
" 'connector.table' = 'users', -- Known tables \n" +
" 'connector.username' = 'root',\n" +
" 'connector.password' = 'password' \n" +
")";
public static void main(String[] args) throws Exception {
// structure StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// structure EnvironmentSettings And designate Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// structure StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
// register mysql Data dimension table
tEnv.executeSql(table_sql);
// perform SQL,id=0 It's because of id Fields are self incrementing primary keys , by 0 be mysql Recognition will default to self increment instead of
String sql = "insert into my_users (id,name,age,status) values(0,'tom',31,0)";
// The first way : Direct execution sql
// TableResult tableResult = tEnv.executeSql(sql);
// The second way : Declare a collection of operations to perform sql
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
TableResult tableResult = stmtSet.execute();
tableResult.print();
}
}
Print the results
+-------------------------------------------+
| default_catalog.default_database.my_users |
+-------------------------------------------+
| -1 |
+-------------------------------------------+
1 row in set
边栏推荐
- APK签名原理
- TypeScript: let
- 互联网医院系统源码 医院小程序源码 智慧医院源码 在线问诊系统源码
- Basic operations of SQL database
- Build MySQL master-slave server under Ubuntu 14.04
- It's suitable for people who don't have eloquence. The benefits of joining the China Video partner program are really delicious. One video gets 3 benefits
- Word2vec yyds dry goods inventory
- Opencv Learning Notes 6 -- image mosaic
- 常见健身器材EN ISO 20957认证标准有哪些
- 项目中字符串判空总结
猜你喜欢
What are the books that have greatly improved the thinking and ability of programming?
Microservice development steps (Nacos)
Junda technology - wechat cloud monitoring scheme for multiple precision air conditioners
Official announcement: Apache Doris graduated successfully and became the top project of ASF!
The markdown editor uses basic syntax
微服务追踪SQL(支持Isto管控下的gorm查询追踪)
It's settled! 2022 Hainan secondary cost engineer examination time is determined! The registration channel has been opened!
手把手带你入门 API 开发
Cannot link redis when redis is enabled
[leetcode 324] swing sorting II thinking + sorting
随机推荐
三十之前一定要明白的职场潜规则
这3款在线PS工具,得试试
APK签名原理
The markdown editor uses basic syntax
对于编程思想和能力有重大提升的书有哪些?
[leetcode 324] swing sorting II thinking + sorting
ArrayList 扩容详解,扩容原理[通俗易懂]
What are the books that have greatly improved the thinking and ability of programming?
The first word of JVM -- detailed introduction to JVM and analysis of runtime data area
使用net core 6 c# 的 NPOI 包,读取excel..xlsx单元格内的图片,并存储到指定服务器
MongoDB第二話 -- MongoDB高可用集群實現
tensorflow2-savedmodel convert to pb(frozen_graph)
Filter &(登录拦截)
Basic operations of SQL database
写在Doris毕业后的第一天
leetcode:329. Longest increasing path in matrix
IDEA全局搜索快捷键(ctrl+shift+F)失效修复
Apk signature principle
JVM第二话 -- JVM内存模型以及垃圾回收
The State Administration of Chia Tai market supervision, the national development and Reform Commission and the China Securities Regulatory Commission jointly reminded and warned some iron ores