当前位置:网站首页>Flink 1.13(八)CDC
Flink 1.13(八)CDC
2022-07-31 22:39:00 【卡农c】
一.简介
CDC 是 Change Data Capture(变更数据获取)
的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费
CDC的种类
Flink-CDC
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据
和增量变更数据
的 source 组件。目前也已开源
二.DataStream方式
1.MySQL binlog开启
修改/etc/my.cnf 文件
sudo vim /etc/my.cnf
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=database-name // 数据库名字
binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库
重启 MySQL 使配置生效
sudo systemctl restart mysqld
2.相关依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
3.编写代码
public class FlinkCDC {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 开启状态后端 1.管理状态,提供状态的查询和访问2.状态的保存位置是内存
env.setStateBackend(new HashMapStateBackend());
// 开启检查点 5秒一次
env.enableCheckpointing(5000);
// 设置检查点的保存位置
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/flink-cdc");
// 检查点为精准一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
// 2.通过Flink CDC 构建MySQL SOURCE
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("175.178.154.194")
.port(3306)
.username("root")
.password("xxxx")
.databaseList("rtdw-flink") // 库名
.tableList("rtdw-flink.base_category1") // 表名,可以有多个,因此用库名.表名来规定表名,防止不同库之间有相同的表名
.deserializer(new StringDebeziumDeserializationSchema()) // 数据输出格式(反序列化),这里暂时选择默认
.startupOptions(StartupOptions.initial()) // 读取方式,initial表示从头开始读取,还有一种latest方式,表示从启动CDC程序后,读取此时刻以后数据库变化的数据
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
// 3.打印数据
streamSource.print();
// 4.启动任务
env.execute("CDC TASK ");
}
}
4.打包
下面的打包插件可以将依赖一起打包
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
5.测试
开启flink集群
bin/start-cluster.sh
输入web页面
http://hadoop102:8081
添加jar包
指定启动类提交即可
可以看到输出
复制任务ID
执行命令 将该任务的检查点保存到hdfs(自定义目录)
bin/flink savepoint 224f7761b07bcee76bcaa74a5248a0e3 hdfs://hadoop102:8020/savepoint
取消任务 去MySQL修改表
找到保存点文件
实现了断点续传
三.自定义反序列化
我们可以看到,我们看到的数据格式十分复杂,不利于我们后期对数据的使用,因此我们需要自定义反序列化器,来实现数据的可用性和可读性
public class CusDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// 创建JSON对象
JSONObject res = new JSONObject();
// 获取topic 里面有数据库名和表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
// 1.获取表名和数据库名
String database = fields[1];
String tableName = fields[2];
// 2.获取value 里面会有before(修改数据会有)和after
Struct value = (Struct)sourceRecord.value();
// 获取before的结构
Struct beforeStruct = value.getStruct("before");
// 将before对象放到JSON对象
JSONObject beforeJSON = new JSONObject();
// 非修改数据是不会有before的,所以要判断
if(beforeStruct != null){
// 获取元数据
Schema beforeSchema = beforeStruct.schema();
// 通过元数据获取字段名
List<Field> fieldList = beforeSchema.fields();
for (Field field : fieldList) {
// 获取字段值
Object beforeValue = beforeStruct.get(field);
// 放入JSON对象
beforeJSON.put(field.name(),beforeValue);
}
}
// 3.获取after
Struct after = value.getStruct("after");
JSONObject afterJSON = new JSONObject();
if(after != null){
Schema afterSchema = after.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = after.get(field);
afterJSON.put(field.name(),afterValue);
}
}
// 4.获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if("create".equals(type)){
// 类型crud的全称
type = "insert";
}
// 5.数据字段写入JSON
res.put("database",database);
res.put("tableName",tableName);
res.put("before",beforeJSON);
res.put("after",afterJSON);
res.put("type",type);
// 6.发送数据至下游
collector.collect(res.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
更改反序列化器
public class FlinkCDC {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.通过Flink CDC 构建MySQL SOURCE
DebeziumSourceFunction sourceFunction = MySQLSource.<String>builder()
.hostname("175.178.154.194")
.port(3306)
.username("root")
.password("zks123456")
.databaseList("rtdw-flink")
.tableList("rtdw-flink.base_sale_attr")
.deserializer(new CusDes())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
// 3.打印数据
streamSource.print();
// 4.启动任务
env.execute("CDC TASK ");
}
}
边栏推荐
- useragent online lookup
- iNeuOS industrial Internet operating system, equipment operation and maintenance business and "low-code" form development tools
- IJCAI2022 | 代数和逻辑约束的混合概率推理
- Several methods of mysql backup table
- 如何减少软件设计和实现之间鸿沟
- Douyin fetches video list based on keywords API
- SQL injection Less54 (limited number of SQL injection + union injection)
- 如何导入 Golang 外部包并使用它?
- BOW/DOM (top)
- VOT2021比赛简介
猜你喜欢
useragent online lookup
[Code Hoof Set Novice Village 600 Questions] Merge two numbers without passing a character array
一文概述:VPN的基本模型及业务类型
[NLP] What is the memory of the model!
如何减少软件设计和实现之间鸿沟
Document management and tools in the development process
How to debug TestCafe
How to reduce the gap between software design and implementation
基于单片机GSM的防火防盗系统的设计
不知道该怎么办的同步问题
随机推荐
Input and output optimization
手写一个简单的web服务器(B/S架构)
支付模块实现
cas and spin locks (is lightweight locks spin locks)
Federated Learning: Multi-source Knowledge Graph Embedding in Federated Scenarios
@JsonFormat(pattern="yyyy-MM-dd") time difference problem
MATLAB program design and application 2.4 Common internal functions of MATLAB
二叉树非递归遍历
Drawing process of hand-drawn map of scenic spots
"SDOI2016" Journey Problem Solution
数据分析(一)——matplotlib
Flex layout in detail
无状态与有状态的区别
LevelSequence source code analysis
hboot与recovery、boot.img、system.img
JS basic exercises
Daily practice——Randomly generate an integer between 1-100 and see how many times you can guess.Requirements: The number of guesses cannot exceed 7 times, and after each guess, it will prompt "bigger"
[QNX Hypervisor 2.2用户手册]9.14 set
C程序设计-方法与实践(清华大学出版社)习题解析
10大主流3D建模技术