当前位置:网站首页>Flink1.11 write MySQL test cases in jdcb mode
Flink1.11 write MySQL test cases in jdcb mode
2022-07-27 01:05:00 【Me fan】
Ben demo Is in the calculation window wordCount And then write mysql
// Data is passed through jdbc The way sink To mysql
windowCounts.addSink(JdbcSink.sink("replace into flink_test(words,nums) values(?,?)",
new JdbcStatementBuilder<Tuple2<String, Integer>>() {
@Override
public void accept(PreparedStatement ps, Tuple2<String, Integer> t) throws SQLException {
ps.setString(1,t.f0);
ps.setInt(2,t.f1);
System.out.println(" The data is "+t.f0+":"+t.f1);
}
},
JdbcExecutionOptions.builder()
.withBatchSize(3) // Note here , default batchSize yes 5000
// .withBatchIntervalMs(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/superset?serverTimezone=UTC")
.withDriverName("com.mysql.jdbc.Driver").withUsername("root").withPassword("123456")
.build()))
.setParallelism(1);// This place also needs attention , If the parallelism is 2,
// If there are three pieces of data , May not be written to mysql, Because a batchSize It is also marked by thread , Need a thread batchSize achieve 3 Talent We go through JDBC The way sink To mysql, Two things need to be noted here
1. The default here is batchSize yes 5000 , If not set , It may cause your data not to be written to msyql
JdbcExecutionOptions.builder()
.withBatchSize(3) // Note here , default batchSize yes 5000
// .withBatchIntervalMs(3)
.build(),Source code is as follows
/**
* JDBC sink batch options.
*/
@PublicEvolving
public class JdbcExecutionOptions implements Serializable {
public static final int DEFAULT_MAX_RETRY_TIMES = 3;
private static final int DEFAULT_INTERVAL_MILLIS = 0;
public static final int DEFAULT_SIZE = 5000;
private final long batchIntervalMs;
private final int batchSize;
private final int maxRetries;
2. sink Parallelism settings for
sink Of batchSize It is related to parallelism , When a thread batchSize achieve 3 Data will be written , So it depends on our business needs , Such as writing topN, Then set a parallelism
Here I want to have 3 Data is written mysql, So when I test locally and do not set the parallelism , Data is always not written correctly
边栏推荐
- 视频类小程序变现的最短路径:从带货到品牌营销
- Flink 1.15实现 Sql 脚本从savepointh恢复数据
- FlinkSql多表(三表) join/interval join
- flinksql 窗口提前触发
- DOM day_ 01 (7.7) introduction and core operation of DOM
- golang实现AES有五种加密模式函数,Encrypt加解密字符串输出
- Redisson working principle - source code analysis
- C # conversion of basic data types for entry
- Spark on yarn's job submission process
- [HarekazeCTF2019]encode_ and_ encode
猜你喜欢
随机推荐
MYSQL中的行锁升级表锁的原因
DataNode Decommision
JSCORE day_ 02(7.1)
Flink 1.15 local cluster deployment standalone mode (independent cluster mode)
通过FlinkCDC将MySQL中变更的数据写入到kafka(DataStream方式)
Only hard work, hard work and hard work are the only way out C - patient entity class
[NCTF2019]SQLi
[ciscn2019 North China Day1 web5] cyberpunk
[NPUCTF2020]ezinclude
[hongminggu CTF 2021] write_ shell
JSCORE day_ 01(6.30) RegExp 、 Function
MySQL - how to determine a field suitable for building an index?
golang实现AES有五种加密模式函数,Encrypt加解密字符串输出
当事务遇上分布式锁
Use Tika to judge the file type
[漏洞实战] 逻辑漏洞挖掘
[SQL注入] 扩展注入手法
[ciscn2019 southeast China division]double secret
[By Pass] WAF 的绕过方式
SparkSql之编程方式

![[NCTF2019]SQLi](/img/a9/e103ccbbbb7dcf5ed20eb2bada528f.png)




![[ciscn2019 North China division Day1 web2]ikun](/img/80/53f8253a80a80931ff56f4e684839e.png)


![[问题]yum资源被占用怎么办](/img/8d/50129fa1b1ef0aa0e968e6e6f20969.png)