当前位置:网站首页>Write the changed data in MySQL to Kafka through flinkcdc (datastream mode)
Write the changed data in MySQL to Kafka through flinkcdc (datastream mode)
2022-07-27 00:59:00 【A photographer who can't play is not a good programmer】
List of articles
Preface
CDC yes Change Data Capture( Change data capture ) AbbreviationFlinkCDC The core idea of is to monitor and capture changes in the database ( Includes the insertion of data or data tables 、 Update and delete, etc ), Keep a complete record of these changes in the order they occur , Write to message middleware for other services to subscribe and consume .
One 、CDC The type of ?
CDC It is mainly divided into query based and Binlog Two ways , Briefly describe the difference between the two :| Query based CDC | be based on binlog Of CDC | |
|---|---|---|
| Common components | Sqoop | Maxwell、Canal、Debezium |
| thought | Batch | Streaming |
| Delay | high | low |
| Whether all data changes can be captured | no | yes |
Flink The community has developed flink-cdc-connectors Components , This is one that can be directly from MySQL、PostgreSQL The database directly reads the total data and incremental change data source Components .
Two 、 adopt FlinkCDC Take data from MySQL Import to Kafka
1. Core code
:
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.atguigu.app.function.CustomerDeserialization;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1. Get execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.flinkcdc structure SourceFunction
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("hadoop101")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall-flink")
.tableList("gmall-flink.base_trademark")
.deserializer(new CustomerDeserialization())
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3. Print data and write data kafka
streamSource.print();
String sinkTopic = "ods_base_db";
streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
//4. Start the task
env.execute("FlinkCDC");
}
}
2. Tool class
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class MyKafkaUtil {
public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
return new FlinkKafkaProducer<String>("192.168.2.101:9092", topic, new SimpleStringSchema());
}
}
3、 ... and 、 Result display
I am here MySQL Every piece of data changed in the database , stay IDEA and kafka It can be detected here :
IDEA:
Kafka:
3. Possible errors and solutions
org.apache.kafka.common.errors.TimeoutException: Topic ods_base_database not present in metadata after 60000 ms
solve :
1.vi kafka/config.server.properties
Modify these three places , Remember to use IP Address , Previously used hadoop101 Keep making mistakes
2. restart kafka, restart zookeeper, Problem solving !
边栏推荐
- 2022.DAY600
- Flink 1.15实现 Sql 脚本从savepointh恢复数据
- CUDA version difference between NVIDIA SMI and nvcc -v
- [ciscn2019 North China Day1 web5] cyberpunk
- Search engine realizes keyword highlighting
- Valueerror: the device should not be 'GPU', since paddepaddle is not compiled with CUDA
- Two methods of automated testing XSS vulnerabilities using burpsuite
- SparkSql之DataFrame
- DOM day_ 01 (7.7) introduction and core operation of DOM
- [ciscn2019 finals Day2 web1]easyweb
猜你喜欢

JSCORE day_ 03(7.4)
![[HarekazeCTF2019]encode_and_encode](/img/f5/c06523a1764717bdf2d91f069c9d77.png)
[HarekazeCTF2019]encode_and_encode
![[interview: concurrent Article 16: multithreading: detailed explanation of wait/notify] principle and wrong usage (false wake-up, etc.)](/img/23/7af903e73e8990459f276b713beec9.png)
[interview: concurrent Article 16: multithreading: detailed explanation of wait/notify] principle and wrong usage (false wake-up, etc.)
![[By Pass] 文件上传的绕过方式](/img/72/d3e46a820796a48b458cd2d0a18f8f.png)
[By Pass] 文件上传的绕过方式
![[CTF攻防世界] WEB区 关于备份的题目](/img/af/b78eb3522160896d77d9e82f7e7810.png)
[CTF攻防世界] WEB区 关于备份的题目
![[WUSTCTF2020]CV Maker](/img/64/06023938e83acc832f06733b6c4d63.png)
[WUSTCTF2020]CV Maker

el-checkbox中的checked勾选状态问题 2021-08-02

logback自定义MessageConverter
![[b01lers2020]Welcome to Earth](/img/e7/c8c0427b95022fbdf7bf2128c469c0.png)
[b01lers2020]Welcome to Earth
![[b01lers2020]Welcome to Earth](/img/e7/c8c0427b95022fbdf7bf2128c469c0.png)
[b01lers2020]Welcome to Earth
随机推荐
箭头函数详解 2021-04-30
数据仓库知识点
Detailed explanation of this point in JS
[WUSTCTF2020]CV Maker
JSCORE day_ 05(7.6)
Only hard work, hard work and hard work are the only way out C - patient entity class
Designer mode
flinksql 窗口提前触发
Solve the problem that there is no ado.net entity data model in vs
2022.DAY599
[RootersCTF2019]I_<3_Flask
[NPUCTF2020]ezinclude
2022.7.10DAY602
Yolo of Darknet_ Forward of layer_ yolo_ Layer comments
Spark on yarn's job submission process
[HITCON 2017]SSRFme
[红明谷CTF 2021]write_shell
[SQL注入] 扩展注入手法
[CISCN2019 总决赛 Day2 Web1]Easyweb
2022.7.16DAY606