当前位置:网站首页>Flink 1.13 (8) CDC
Flink 1.13 (8) CDC
2022-08-01 03:42:00 【Canon c】
一.简介
CDC 是 Change Data Capture(变更数据获取)
的简称.核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费
CDC的种类
Flink-CDC
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL Wait for the database to be read directly全量数据
和增量变更数据
的 source 组件.目前也已开源
二.DataStream方式
1.MySQL binlog开启
修改/etc/my.cnf 文件
sudo vim /etc/my.cnf
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=database-name // 数据库名字
binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库
重启 MySQL 使配置生效
sudo systemctl restart mysqld
2.相关依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
3.编写代码
public class FlinkCDC {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Open state backend 1.管理状态,Provides status query and access2.The state is kept in memory
env.setStateBackend(new HashMapStateBackend());
// 开启检查点 5秒一次
env.enableCheckpointing(5000);
// Sets the save location for checkpoints
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/flink-cdc");
// Checkpoints are exactly once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10000L);
// 2.通过Flink CDC 构建MySQL SOURCE
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("175.178.154.194")
.port(3306)
.username("root")
.password("xxxx")
.databaseList("rtdw-flink") // 库名
.tableList("rtdw-flink.base_category1") // 表名,可以有多个,So use the library name.table name to specify the table name,Prevent the same table name between different libraries
.deserializer(new StringDebeziumDeserializationSchema()) // 数据输出格式(反序列化),Here temporarily select the default
.startupOptions(StartupOptions.initial()) // 读取方式,initial表示从头开始读取,还有一种latest方式,表示从启动CDC程序后,Read the data changed in the database after this moment
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
// 3.打印数据
streamSource.print();
// 4.启动任务
env.execute("CDC TASK ");
}
}
4.打包
The following package plugins can package dependencies together
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
5.测试
开启flink集群
bin/start-cluster.sh
输入web页面
http://hadoop102:8081
添加jar包
Specify the startup class to submit
可以看到输出
复制任务ID
执行命令 Save the checkpoint for this task tohdfs(自定义目录)
bin/flink savepoint 224f7761b07bcee76bcaa74a5248a0e3 hdfs://hadoop102:8020/savepoint
取消任务 去MySQL修改表
Locate the savepoint file
实现了断点续传
三.自定义反序列化
我们可以看到,The format of the data we see is very complex,It is not conducive to our later use of data,So we need a custom deserializer,To achieve data availability and readability
public class CusDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// 创建JSON对象
JSONObject res = new JSONObject();
// 获取topic It contains the database name and table name
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
// 1.Get the table name and database name
String database = fields[1];
String tableName = fields[2];
// 2.获取value 里面会有before(There will be changes to the data)和after
Struct value = (Struct)sourceRecord.value();
// 获取before的结构
Struct beforeStruct = value.getStruct("before");
// 将before对象放到JSON对象
JSONObject beforeJSON = new JSONObject();
// There will be no unmodified databefore的,所以要判断
if(beforeStruct != null){
// 获取元数据
Schema beforeSchema = beforeStruct.schema();
// Get field names from metadata
List<Field> fieldList = beforeSchema.fields();
for (Field field : fieldList) {
// 获取字段值
Object beforeValue = beforeStruct.get(field);
// 放入JSON对象
beforeJSON.put(field.name(),beforeValue);
}
}
// 3.获取after
Struct after = value.getStruct("after");
JSONObject afterJSON = new JSONObject();
if(after != null){
Schema afterSchema = after.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = after.get(field);
afterJSON.put(field.name(),afterValue);
}
}
// 4.获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if("create".equals(type)){
// 类型crud的全称
type = "insert";
}
// 5.Data field writeJSON
res.put("database",database);
res.put("tableName",tableName);
res.put("before",beforeJSON);
res.put("after",afterJSON);
res.put("type",type);
// 6.发送数据至下游
collector.collect(res.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
Change the deserializer
public class FlinkCDC {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.通过Flink CDC 构建MySQL SOURCE
DebeziumSourceFunction sourceFunction = MySQLSource.<String>builder()
.hostname("175.178.154.194")
.port(3306)
.username("root")
.password("zks123456")
.databaseList("rtdw-flink")
.tableList("rtdw-flink.base_sale_attr")
.deserializer(new CusDes())
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
// 3.打印数据
streamSource.print();
// 4.启动任务
env.execute("CDC TASK ");
}
}
边栏推荐
- [uniCloud] Application and Improvement of Cloud Objects
- ARM 交叉编译
- IDEA debugging
- Introduction to the Elastic Stack
- The IDEA can't find or unable to load The main class or Module "*" must not contain The source root "*" The root already belongs to The Module "*"
- leetcode6133. 分组的最大数量(中等)
- 项目越写越大,我是这样做拆分的
- Google Earth Engine - Error resolution of Error: Image.clipToBoundsAndScale, argument 'input': Invalid type
- Input input box cursor automatically jumps to the last bug after the previous input
- 软件测试周刊(第82期):其实所有纠结做选择的人心里早就有了答案,咨询只是想得到内心所倾向的选择。
猜你喜欢
TypeScript简化运行之ts-node
Message Queuing Message Storage Design (Architecture Camp Module 8 Jobs)
Ordinary users cannot access HGFS directory
Input输入框光标在前输入后自动跳到最后面的bug
<JDBC> 批量插入 的四种实现方式:你真的get到了吗?
Message queue MySQL table for storing message data
【 Make YOLO Great Again 】 YOLOv1 v7 full range with large parsing (Neck)
Flutter "Hello world" program code
Summary of mobile page optimization in seconds
Completely closed Chrome updated and in the top right corner of the tip
随机推荐
Flutter Tutorial 01 Configure the environment and run the demo program (tutorial includes source code)
在打开MYSQL表时,有的可以显示编辑,有的没有,如何设置。
Valentine's Day Romantic 3D Photo Wall [with source code]
2022-07-31: Given a graph with n points and m directed edges, you can use magic to turn directed edges into undirected edges, such as directed edges from A to B, with a weight of 7.After casting the m
win10 固定本机IP
Game Security 03: A Simple Explanation of Buffer Overflow Attacks
The fledgling Xiao Li's 113th blog project notes: Wisdom cloud smart flower watering device combat (2) - basic Demo implementation
测试
MySQL4
Elastic Stack的介绍
[uniCloud] Application and Improvement of Cloud Objects
2. # 代码注释
IDEA debugging
leetcode:126. Word Solitaire II
Google Earth Engine - Error resolution of Error: Image.clipToBoundsAndScale, argument 'input': Invalid type
Difference Between Compiled and Interpreted Languages
This map drawing tool is amazing, I recommend it~~
When opening a MYSQL table, some can display editing, some do not, how to set.
How to download the Keil package
每周小结(*67):为什么不敢发表观点