当前位置:网站首页>Flink 1.13 (8) CDC
Flink 1.13 (8) CDC
2022-08-01 03:42:00 【Canon c】
一.简介
CDC 是 Change Data Capture(变更数据获取)的简称.核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费
CDC的种类

Flink-CDC
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL Wait for the database to be read directly全量数据和增量变更数据的 source 组件.目前也已开源
二.DataStream方式
1.MySQL binlog开启
修改/etc/my.cnf 文件
sudo vim /etc/my.cnf
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=database-name // 数据库名字
binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库
重启 MySQL 使配置生效
sudo systemctl restart mysqld
2.相关依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
3.编写代码
public class FlinkCDC {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Open state backend 1.管理状态,Provides status query and access2.The state is kept in memory
env.setStateBackend(new HashMapStateBackend());
// 开启检查点 5秒一次
env.enableCheckpointing(5000);
// Sets the save location for checkpoints
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/flink-cdc");
// Checkpoints are exactly once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
// 2.通过Flink CDC 构建MySQL SOURCE
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("175.178.154.194")
.port(3306)
.username("root")
.password("xxxx")
.databaseList("rtdw-flink") // 库名
.tableList("rtdw-flink.base_category1") // 表名,可以有多个,So use the library name.table name to specify the table name,Prevent the same table name between different libraries
.deserializer(new StringDebeziumDeserializationSchema()) // 数据输出格式(反序列化),Here temporarily select the default
.startupOptions(StartupOptions.initial()) // 读取方式,initial表示从头开始读取,还有一种latest方式,表示从启动CDC程序后,Read the data changed in the database after this moment
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
// 3.打印数据
streamSource.print();
// 4.启动任务
env.execute("CDC TASK ");
}
}
4.打包
The following package plugins can package dependencies together
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
5.测试
开启flink集群
bin/start-cluster.sh
输入web页面
http://hadoop102:8081
添加jar包
Specify the startup class to submit

可以看到输出

复制任务ID

执行命令 Save the checkpoint for this task tohdfs(自定义目录)
bin/flink savepoint 224f7761b07bcee76bcaa74a5248a0e3 hdfs://hadoop102:8020/savepoint

取消任务 去MySQL修改表

Locate the savepoint file

实现了断点续传
三.自定义反序列化
我们可以看到,The format of the data we see is very complex,It is not conducive to our later use of data,So we need a custom deserializer,To achieve data availability and readability
public class CusDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// 创建JSON对象
JSONObject res = new JSONObject();
// 获取topic It contains the database name and table name
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
// 1.Get the table name and database name
String database = fields[1];
String tableName = fields[2];
// 2.获取value 里面会有before(There will be changes to the data)和after
Struct value = (Struct)sourceRecord.value();
// 获取before的结构
Struct beforeStruct = value.getStruct("before");
// 将before对象放到JSON对象
JSONObject beforeJSON = new JSONObject();
// There will be no unmodified databefore的,所以要判断
if(beforeStruct != null){
// 获取元数据
Schema beforeSchema = beforeStruct.schema();
// Get field names from metadata
List<Field> fieldList = beforeSchema.fields();
for (Field field : fieldList) {
// 获取字段值
Object beforeValue = beforeStruct.get(field);
// 放入JSON对象
beforeJSON.put(field.name(),beforeValue);
}
}
// 3.获取after
Struct after = value.getStruct("after");
JSONObject afterJSON = new JSONObject();
if(after != null){
Schema afterSchema = after.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = after.get(field);
afterJSON.put(field.name(),afterValue);
}
}
// 4.获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if("create".equals(type)){
// 类型crud的全称
type = "insert";
}
// 5.Data field writeJSON
res.put("database",database);
res.put("tableName",tableName);
res.put("before",beforeJSON);
res.put("after",afterJSON);
res.put("type",type);
// 6.发送数据至下游
collector.collect(res.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
Change the deserializer
public class FlinkCDC {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.通过Flink CDC 构建MySQL SOURCE
DebeziumSourceFunction sourceFunction = MySQLSource.<String>builder()
.hostname("175.178.154.194")
.port(3306)
.username("root")
.password("zks123456")
.databaseList("rtdw-flink")
.tableList("rtdw-flink.base_sale_attr")
.deserializer(new CusDes())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
// 3.打印数据
streamSource.print();
// 4.启动任务
env.execute("CDC TASK ");
}
}

边栏推荐
- Dart named parameter syntax
- Basic usage concepts of vim
- Device tree - conversion from dtb format to struct device node structure
- Difference Between Compiled and Interpreted Languages
- Flutter Tutorial 01 Configure the environment and run the demo program (tutorial includes source code)
- [Search topic] After reading the inevitable BFS solution to the shortest path problem
- JS new fun(); class and instance JS is based on object language Can only act as a class by writing constructors
- Weekly Summary (*67): Why not dare to express an opinion
- 【SemiDrive源码分析】系列文章链接汇总(全)
- 剑指offer专项突击版第16天
猜你喜欢

Google Earth Engine - Error resolution of Error: Image.clipToBoundsAndScale, argument 'input': Invalid type

项目越写越大,我是这样做拆分的

Step by step hand tearing carousel Figure 3 (nanny level tutorial)

简单易用的任务队列-beanstalkd

MySQL3

Difference Between Compiled and Interpreted Languages
![[uniCloud] Application and Improvement of Cloud Objects](/img/e0/4b899e17f683043d66a8fa426e88a4.jpg)
[uniCloud] Application and Improvement of Cloud Objects

Compiled on unbutu with wiringPi library and run on Raspberry Pi

leetcode6133. 分组的最大数量(中等)

使用ts-node报错
随机推荐
The 16th day of the special assault version of the sword offer
Article summary: the basic model of VPN and business types
Dart 命名参数语法
Soft Exam Senior System Architect Series: Basic Knowledge of System Development
二舅
How to write a high-quality digital good article recommendation
带wiringPi库在unbutu 编译 并且在树莓派运行
移动端页面秒开优化总结
Replacing the Raspberry Pi Kernel
软件测试基础理论知识—用例篇
MySQL4
剑指offer专项突击版第16天
[Message Notification] How about using the official account template message?
Open source project site must-have & communication area function
Device tree - conversion from dtb format to struct device node structure
Interview Blitz 69: Is TCP Reliable?Why?
MySQL3
【入门教程】Rollup模块打包器整合
Software Testing Weekly (Issue 82): In fact, all those who are entangled in making choices already have the answer in their hearts, and consultation is just to get the choice that they prefer.
Guys, MySQL cdc source recycles replication slave and r in incremental process