当前位置:网站首页>Write the changed data in MySQL to Kafka through flinkcdc (datastream mode)
Write the changed data in MySQL to Kafka through flinkcdc (datastream mode)
2022-07-27 00:59:00 【A photographer who can't play is not a good programmer】
List of articles
Preface
CDC yes Change Data Capture( Change data capture ) AbbreviationFlinkCDC The core idea of is to monitor and capture changes in the database ( Includes the insertion of data or data tables 、 Update and delete, etc ), Keep a complete record of these changes in the order they occur , Write to message middleware for other services to subscribe and consume .
One 、CDC The type of ?
CDC It is mainly divided into query based and Binlog Two ways , Briefly describe the difference between the two :| Query based CDC | be based on binlog Of CDC | |
|---|---|---|
| Common components | Sqoop | Maxwell、Canal、Debezium |
| thought | Batch | Streaming |
| Delay | high | low |
| Whether all data changes can be captured | no | yes |
Flink The community has developed flink-cdc-connectors Components , This is one that can be directly from MySQL、PostgreSQL The database directly reads the total data and incremental change data source Components .
Two 、 adopt FlinkCDC Take data from MySQL Import to Kafka
1. Core code
:
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.atguigu.app.function.CustomerDeserialization;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1. Get execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.flinkcdc structure SourceFunction
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("hadoop101")
.port(3306)
.username("root")
.password("123456")
.databaseList("gmall-flink")
.tableList("gmall-flink.base_trademark")
.deserializer(new CustomerDeserialization())
.startupOptions(StartupOptions.latest())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3. Print data and write data kafka
streamSource.print();
String sinkTopic = "ods_base_db";
streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
//4. Start the task
env.execute("FlinkCDC");
}
}
2. Tool class
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class MyKafkaUtil {
public static FlinkKafkaProducer<String> getKafkaProducer(String topic){
return new FlinkKafkaProducer<String>("192.168.2.101:9092", topic, new SimpleStringSchema());
}
}
3、 ... and 、 Result display
I am here MySQL Every piece of data changed in the database , stay IDEA and kafka It can be detected here :
IDEA:
Kafka:
3. Possible errors and solutions
org.apache.kafka.common.errors.TimeoutException: Topic ods_base_database not present in metadata after 60000 ms
solve :
1.vi kafka/config.server.properties
Modify these three places , Remember to use IP Address , Previously used hadoop101 Keep making mistakes
2. restart kafka, restart zookeeper, Problem solving !
边栏推荐
猜你喜欢

VMware Workstation 虚拟机启动就直接蓝屏重启问题解决
![[CISCN2019 华东南赛区]Double Secret](/img/51/9597968ff1747a67e10a70b785ee9f.png)
[CISCN2019 华东南赛区]Double Secret

Leetcode 301 week
![[b01lers2020]Welcome to Earth](/img/e7/c8c0427b95022fbdf7bf2128c469c0.png)
[b01lers2020]Welcome to Earth

MySQL索引优化:哪些情况下需要建立索引(适合构建索引的几种情况)

Flink 滑动窗口理解&具体业务场景介绍

Detailed explanation of CSRF forged user request attack

MYSQL 使用及实现排名函数RANK、DENSE_RANK和ROW_NUMBER
![[CTF攻防世界] WEB区 关于备份的题目](/img/af/b78eb3522160896d77d9e82f7e7810.png)
[CTF攻防世界] WEB区 关于备份的题目
![[CISCN2019 华北赛区 Day1 Web5]CyberPunk](/img/84/b186adc8becfc9b3def7dfd8e4cd41.png)
[CISCN2019 华北赛区 Day1 Web5]CyberPunk
随机推荐
logback自定义MessageConverter
2022.7.14DAY604
The difference between golang slice make and new
通过FlinkCDC将MySQL中变更的数据写入到kafka(DataStream方式)
Flink 滑动窗口理解&具体业务场景介绍
2022.7.14DAY605
CUDA version difference between NVIDIA SMI and nvcc -v
Promise基本用法 20211130
Ansible MySQL installation case record
[HFCTF2020]EasyLogin
10 Web APIs
[ciscn2019 southeast China division]double secret
[interview: concurrent Article 16: multithreading: detailed explanation of wait/notify] principle and wrong usage (false wake-up, etc.)
Spark source code learning - memory tuning
[Network Research Institute] attackers scan 1.6 million WordPress websites to find vulnerable plug-ins
Flink1.11 多并行度watermark测试
Canal 安装
[CTF攻防世界] WEB区 关于备份的题目
Flink 1.15 local cluster deployment standalone mode (independent cluster mode)
[CISCN2019 华北赛区 Day1 Web2]ikun