当前位置:网站首页>Flink CDC practice (including practical steps and screenshots)
Flink CDC practice (including practical steps and screenshots)
2022-07-03 09:35:00 【Did Xiao Hu get stronger today】
List of articles
This paper mainly focuses on B Stations Flink Video for learning and Practice , Record the relevant key points , As your study notes , In order to get started quickly .
Flink CDC
1. CDC brief introduction
1.1 What is? CDC
CDC yes Change Data Capture( Change data acquisition ) For short . The core idea is , Monitor and capture the database The change of ( Includes the insertion of data or data tables 、 Update and delete, etc ), Keep a complete record of these changes in the order they occur Come down , Write to message middleware for other services to subscribe and consume .
1.2 CDC The type of
CDC It is mainly divided into query based and Binlog Two ways .
1.3 Flink-CDC
Flink The community has developed flink-cdc-connectors Components , This is one that can be directly from MySQL、PostgreSQL The database directly reads the total data and incremental change data source Components .
Open source address :https://github.com/ververica/flink-cdc-connectors
2. Flink CDC Case practice
2.1 DataStream Application of method
2.1.1 Import dependence
<!-- You can hit dependency to jar In bag -->
2.1.2 Write code
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/** * FlinkCDC * * @author hutianyi * @date 2022/5/30 **/
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1. obtain Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// adopt FlinkCDC structure SourceFunction
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
.databaseList("cdc_test") // Monitored database
.tableList("cdc_test.user_info") // The table under the monitored database
.deserializer(new StringDebeziumDeserializationSchema())// Deserialization
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
//3. The data to print
//4. Start the task
Turn on MysqlBinlog:
sudo vim /etc/my.cnf
restart mysql:
sudo systemctl restart mysqld
Switch to root user , Check whether it is successfully opened :
cd /var/lib/mysql
Create new databases and tables , And write data :
Review binlog file :
Already by 154 Turned into 926, explain binlog There is no problem opening .
Start project :
Add a piece of data :
You can see that the console has captured the new data :
Modify the second data :
You can see the data captured in the console :
Delete the second data :
Only before The data of .
be aware op There are different values :
r: Query read c: newly added u: to update d: Delete
2.2.3 Submit to the cluster to run
Open in code checkpoint:
//1.1 Turn on checkpoint
env.enableCheckpointing(5000);//5 Second
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));
pack :
start-up flink colony :
Will play well jar Upload the package to the cluster :
start-up :
bin/flink run -m hadoop102:8081 -c com.tianyi.FlinkCDC ./flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
stay Flink webui To view the :8081 port
Check the log :
2.1.4 Breakpoint continuation savepoint
To the present Flink Program creation Savepoint:
bin/flink savepoint JobId hdfs://hadoop102:8020/flink/save
After closing the program Savepoint Restart the program :
bin/flink run -s hdfs://hadoop102:8020/flink/save/... -c Full class name flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2.2 FlinkSQL Application of method
2.2.1 Code implementation
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkSQLCDC {
public static void main(String[] args) throws Exception {
//1. Create an execution environment
StreamExecutionEnvironment env =
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2. establish Flink-MySQL-CDC Of Source
tableEnv.executeSql("CREATE TABLE user_info (" +
" id STRING primary key," +
" name STRING," +
" sex STRING" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'hostname' = 'hadoop102'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'database-name' = 'cdc_test'," +
" 'table-name' = 'user_info'" +
//3. Query the data and convert it into stream output
Table table = tableEnv.sqlQuery("select * from user_info");
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
//4. start-up
Start project :
2.2.2 test
Add data :
The console captures the changes :
2.3 Custom deserializer
Code implementation :
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {
/** * { * "db":"", * "tableName":"", * "before":{"id":"1001","name":""...}, * "after":{"id":"1001","name":""...}, * "op":"" * } */
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// establish JSON Object is used to encapsulate the result data
JSONObject result = new JSONObject();
// Get library name & Table name
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
result.put("db", fields[1]);
result.put("tableName", fields[2]);
// obtain before data
Struct value = (Struct) sourceRecord.value();
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
// Get column information
Schema schema = before.schema();
List<Field> fieldList = schema.fields();
for (Field field : fieldList) {
beforeJson.put(field.name(), before.get(field));
result.put("before", beforeJson);
// obtain after data
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
// Get column information
Schema schema = after.schema();
List<Field> fieldList = schema.fields();
for (Field field : fieldList) {
afterJson.put(field.name(), after.get(field));
result.put("after", afterJson);
// Get operation type
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
result.put("op", operation);
// Output data
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
Create custom serialized object processing :
import com.tianyi.func.CustomerDeserializationSchema;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC2 {
public static void main(String[] args) throws Exception {
//1. obtain Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.1 Turn on CK
// env.enableCheckpointing(5000);
// env.getCheckpointConfig().setCheckpointTimeout(10000);
// env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));
//2. adopt FlinkCDC structure SourceFunction
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
// .tableList("cdc_test.user_info")
// Use a custom deserializer
.deserializer(new CustomerDeserializationSchema())
DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
//3. The data to print
//4. Start the task
2.4 DataStream and FlinkSQL A comparison of ways
DataStream stay Flink1.12 and 1.13 Both can be used. , and FlinkSQL Only in Flink1.13 Use .
DataStream It can monitor multiple databases and tables at the same time , and FlinkSQL Only a single table can be monitored .
This paper mainly introduces Flink CDC The concept of , And for DataStream and FlinkSQL Practice in two ways , You can feel it intuitively FlinkCDC The power of , The two methods are compared .
Reference material
- Analysis of the implementation principle of an open source markdown to rich text editor
- 基于opencv实现桌面图标识别
- IDEA 中使用 Hudi
- Send mail using WP mail SMTP plug-in
- Spark 结构化流写入Hudi 实践
- WARNING: You are using pip version 21.3.1; however, version 22.0.3 is available. Prompt to upgrade pip
- The server denied password root remote connection access
- Temper cattle ranking problem
- Leetcode daily question (1856. maximum subarray min product)
- Crawler career from scratch (IV): climb the bullet curtain of station B through API
Leetcode daily question (2090. K radius subarray averages)
Solve editor MD uploads pictures and cannot get the picture address
Crawler career from scratch (II): crawl the photos of my little sister ② (the website has been disabled)
Spark 概述
There is no open in default browser option in the right click of the vscade editor
Hudi 快速体验使用(含操作详细步骤及截图)
IDEA 中使用 Hudi
Vscode编辑器右键没有Open In Default Browser选项
[CSDN] C1 training problem analysis_ Part III_ JS Foundation
Navicat, MySQL export Er graph, er graph
Jestson nano downloads updated kernel and DTB from TFTP server
Basic knowledge of database design
Win10 quick screenshot
Spark 概述
Hudi integrated spark data analysis example (including code flow and test results)
CATIA automation object architecture - detailed explanation of application objects (I) document/settingcontrollers
What do software test engineers do? Pass the technology to test whether there are loopholes in the software program
Vscode编辑器右键没有Open In Default Browser选项
[kotlin learning] control flow of higher-order functions -- lambda return statements and anonymous functions
Banner - Summary of closed group meeting
Leetcode daily question (1856. maximum subarray min product)
1300. sum of varied array closed to target
Crawler career from scratch (IV): climb the bullet curtain of station B through API
1922. Count Good Numbers
Beego learning - Tencent cloud upload pictures