当前位置:网站首页>Flinkcdc2.0 uses flinksql to collect MySQL
Flinkcdc2.0 uses flinksql to collect MySQL
2022-07-25 07:08:00 【Big data Institute】
1. Dependency management
Put the following dependent packages into FLINK_HOME/lib Next .
flink-sql-connector-mysql-cdc-2.2.0.jar
flink-connector-jdbc_2.11-1.14.3.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
2.Flink Global configuration
modify flink-conf.yaml file :
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 10min
state.backend: filesystem
state.checkpoints.dir: hdfs://mycluster/flinkcdc-checkpoints3.sql-client Submit job mode
1.Standalone Pattern
start-up sql-client:bin/sql-client.sh embedded
Be careful , If you use standalone mode , You need to start one Flink standalone colony , The method is as follows :
bin/start-cluster.sh
2.yarn-session Pattern ( How to use this case )
Start... First Flink yarn-session colony :bin/yarn-session.sh -s 1 -jm 1024 -tm 1024
And then it starts sql-client:bin/sql-client.sh embedded -s yarn-session
4.checkpoint To configure
Official website address :https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#checkpointing
#sql-client Set up checkpoint Parameters
SET 'execution.checkpointing.interval' = '10s';
SET 'parallelism.default' = '3';5. establish source table
CREATE TABLE `cars`(
`id` BIGINT,
`owerId` BIGINT,
`carCode` STRING,
`carColor` STRING,
`type` BIGINT,
`remark` STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop1',
'port' = '3306',
'username' = 'hive',
'password' = 'hive',
'database-name' = 'sca',
'table-name' = 'cars',
'connect.timeout' = '60s'
);6. establish sink table
CREATE TABLE `cars_copy`(
`id` BIGINT,
`owerId` BIGINT,
`carCode` STRING,
`carColor` STRING,
`type` BIGINT,
`remark` STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8',
'username' = 'hive',
'password' = 'hive',
'table-name' = 'cars_copy',
'sink.parallelism' = '2'
);7.source to sink table
Write the collected data into MySQL
insert into cars_copy SELECT * FROM cars;The number of data records in the query result table
select count(*) from cars_copyNew test data set ( Check the result table again )
insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096006','10244815',' harbor T·7RONE',' Red ','1',NULL);
insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096007','10244816',' harbor T·7RONE',' yellow ','1',NULL);remarks : If manually cacel job, Next time restart job The data in the table will still be collected again .
8.cacel job Save on Save point
bin/flink stop --savepointPath hdfs://mycluster/flinkcdc-savepoints -Dyarn.application.id=application_1658045078748_0001 79ce915e39fc1d18a194b6a464d7c3fdremarks : The last parameter is yarn Medium job id, The second parameter is flink Of job id.
9.cacel job Then resume job
# Set up job From the last time savepoint Position start processing
SET 'execution.checkpointing.interval' = '10s';
SET 'parallelism.default' = '3';
SET 'execution.savepoint.path' = 'hdfs://mycluster/flinkcdc-savepoints/savepoint-79ce91-92206bcaaad2';remarks : The value of this parameter is savepoint route .
# perform flink sql job
insert into cars_copy SELECT * FROM cars;# New test data set
insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096008','10244815',' harbor T·7RONE',' Red ','1',NULL);
insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096009','10244816',' harbor T·7RONE',' yellow ','1',NULL);# Re query the number of data records in the result table
select count(*) from cars_copyNormal condition , What is collected at this time is the new data , Historical data will not be collected .
remarks :Flink SQL Way to collect MySQL data , Easy to use , But only single tables are supported .
边栏推荐
- Basic usage of thread class
- GIS实战应用案例100篇(十七)-基于DEM制作三维地图
- Rust标准库-实现一个TCP服务、Rust使用套接字
- Octopus network community call 1 starts Octopus Dao construction
- Baidu xirang's first yuan universe auction ended, and Chen Danqing's six printmaking works were all sold!
- Dynamic memory management
- __ str__ Output class
- JS array = number assignment changes by one, causing the problem of changing the original array
- Can interface debugging still play like this?
- Standard C language 6
猜你喜欢

File operation-

Meta is in a deep quagmire: advertisers reduce spending and withdraw from the platform

10分钟看懂Jmeter 是如何玩转 redis 数据库的

Ideal L9, can't cross a pit on the road?

Tp5.1 foreach adds a new field in the controller record, and there is no need to write all the other fields again without changing them (not operating in the template) (paging)

Leetcode46 Full Permutation (Introduction to backtracking)

How can dbcontext support the migration of different databases in efcore advanced SaaS system

【愚公系列】2022年7月 Go教学课程 016-运算符之逻辑运算符和其他运算符

With apple not making money, the 2trillion "fruit chain" abandons "fruit" and embraces "special"

Insight into mobile application operation growth in 2022 white paper: the way to "break the situation" in the era of diminishing traffic dividends
随机推荐
Decrypting numpy is a key difficulty in solving the gradient
流量对于元宇宙来讲并不是最重要的,能否真正给传统的生活方式和生产方式带来改变,才是最重要的
With apple not making money, the 2trillion "fruit chain" abandons "fruit" and embraces "special"
10分钟看懂Jmeter 是如何玩转 redis 数据库的
GIS实战应用案例100篇(十七)-基于DEM制作三维地图
[add, delete, modify, and check the array base]
Leetcode sword finger offer brush question notes
MySQL remote login
[computer explanation] NVIDIA released geforce RTX Super Series graphics cards, and the benefits of game players are coming!
mvc与三层结构终极区别
微生物健康,不要排斥人体内微生物
Kyligence Li Dong: from the data lake to the index middle stage, improve the ROI of data analysis
Observer mode
【电脑讲解】NVIDIA发布GeForce RTX SUPER系列显卡,游戏玩家福利来了!
The income of bank financial management is getting lower and lower. Now which financial products have high income?
RecycleView实现item重叠水平滑动
微信小程序wx.request接口
杜教筛
[daily question 1] 1184. Distance between bus stops
Rongyun launched a real-time community solution and launched "advanced players" for vertical interest social networking