当前位置:网站首页>The practice of alibaba, data synchronization component canal
The practice of alibaba, data synchronization component canal
2022-08-02 05:45:00 【ZWZhangYu】
文章目录
- 认识
- 基本原理
- 部署参考
- Comparison of related products
- 实践经验分享
- Client案例
- Example of a message queue message
- 批量插入10000data to check data integrity
- Modify the association table structure test
- Data compression and table structure design
- Whether the transaction rollback will still listen to the data
- 单个INSERTMessage description for multiple data generation
- Low probability data loss problem
认识
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
基于日志增量订阅和消费的业务包括
【1】数据库镜像
【2】数据库实时备份
【3】索引构建和实时维护(拆分异构索引、倒排索引等)
【4】业务 cache 刷新
【5】带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
基本原理
MySQL主备复制原理
【1】MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
【2】MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
【3】MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理
【1】canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
【2】MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
【3】canal 解析 binary log 对象(原始为 byte 流)
Canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去.
部署参考
For specific deployment steps, see :
https://github.com/alibaba/canal/wiki/QuickStart
这边使用的是Docker进行容器化部署,从DockerHub上找到了canal-server的容器镜像,使用版本是V1.1.5版本.
canal/canal-server:v1.1.5
The external port is required11111,The port mapped to the host above is 30000.
The way data volumes are configured uses host-mapped volumes,Two configuration files need to be mapped.
Two of the mounts correspondinstance.properties配置文件和canal.properties配置文件.
instance.properties配置文件
instance.properties配置文件
instance.properties配置文件,This configuration file is mainly used to configure monitoringMySQL实例的相关属性.
position infoThe main configuration is connectedMySQL地址,binlog日志名称,日志位置,时间戳.The two messages in the middle can be passed inMySQL中通过
show master status 命令查看
Timestamps can be based on the current time,精确到毫秒级别.
canal.instance.XXX Specifies the account password and other information to connect to the corresponding database,This information corresponds to the following step in the official documentation.
canal.instance.filter.regex Represents a filter rule for table names,That is, it is hoped that those tables of the current database instance can be monitored.This can be configured in the configuration file,It can also be configured in client code,It is still required to build in the configuration file here,The latter connection method does not use the client connection.
举例:Full library full table:.\…
Specifies the full table of a library:test…* ,match library nametest下所有表
单表:test.user ;match library nametest下user表
Use multiple rules in combination: test…*,test2.user1,匹配testAll tables under the library and matchingtest2库下usee1表
修改客户端代码(Java)
修改java程序下connector.subscribeThe configured filter regular
Full library full table
connector.subscribe(“.\…”)
Specifies the full table of the library
connector.subscribe(“test\…")
单表
connector.subscribe(“test.user”)
Use multiple rules in combination
connector.subscribe("test\…,test2.user1,test3.user2”)
canal.properties配置文件
canal.propertiesConfiguration files are mostly descriptionscanal服务的属性,开放的端口,连接的MQ,admin配置(This component is not used this time)
配置消息队列
In the beginning it was connected directly through the clientCanalThe server consumes information,但是这样会存在一些问题,Data loss occurs when the client is unavailable due to reboot or other reasons,Consider data consistency and integrity,增加了RabbitMQas messaging middleware.
配置方式,修改confg/canal.properties添加MQ的配置,canal支持kafka、RocketMQ、RabbitMQ.因为项目中使用RabbitMQ比较多,也就选择了RabbitMQ.
配置方式参考如下,需要提前创建exchange、queue,并根据MQfill in the configurationhost/username/password信息.
配置完成后重启容器,And execute a few in the databaseDML语句,观察MQIs there any news coming in.
Details can be viewed at Get a message
Comparison of related products
There are similar components that listen to database changesMaxWell、canal等,Because it has been used beforecanalI am more familiar with it, so I chose this component.
实践经验分享
Client案例
canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑
JavaVersion reference link
https://github.com/alibaba/canal/wiki/ClientExample
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
下面提供一个简单的SpringBoot示例
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CannalClient implements InitializingBean {
private final static int BATCH_SIZE = 1000;
@Override
public void afterPropertiesSet() throws Exception {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.208.31", 11111), "example", "canal", "canal");
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
printEntry(message.getEntries());
}
//进行 batch id 的确认.确认之后,小于等于此 batchId 的 Message 都会被确认.
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
/** * 打印canal server解析binlog获得的实体类信息 */
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
CanalEntry.EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//判断是否是DDL语句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
//获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
//如果是删除语句
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
Example of a message queue message
The following briefly shows the content of the messages monitored in the message queue,Here is an example of a test table,Show new additions、删除、Received data content when updating
CREATE TABLE `test` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`phone` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
新增消息
{
"data": [{
"id": "1",
"name": "测试",
"phone": "12345678"
}],
"database": "db",
"es": 1658225090000,
"id": 7326,
"isDdl": false,
"mysqlType": {
"id": "int",
"name": "varchar(255)",
"phone": "varchar(255)"
},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"phone": 12
},
"table": "test",
"ts": 1658225090766,
"type": "INSERT"
}
更新操作
{
"data": [{
"id": "1",
"name": "测试更新",
"phone": "12345678"
}],
"database": "db",
"es": 1658225185000,
"id": 7333,
"isDdl": false,
"mysqlType": {
"id": "int",
"name": "varchar(255)",
"phone": "varchar(255)"
},
"old": [{
"name": "测试"
}],
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"phone": 12
},
"table": "test",
"ts": 1658225185768,
"type": "UPDATE"
}
删除操作
{
"data": [{
"id": "1",
"name": "测试更新",
"phone": "12345678"
}],
"database": "db",
"es": 1658225230000,
"id": 7337,
"isDdl": false,
"mysqlType": {
"id": "int",
"name": "varchar(255)",
"phone": "varchar(255)"
},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"phone": 12
},
"table": "test",
"ts": 1658225230416,
"type": "DELETE"
}
批量插入10000data to check data integrity
Consider data integrity,The current test quickly inserts 10,000 pieces of data for testing,See the final data monitoring situation.
CREATE DEFINER=`root`@`%` PROCEDURE `batchInsertUserTest`()
BEGIN
declare i int;
set i=1;
while i<=10000 do
INSERT INTO -- INSERT测试语句
set i=i+1;
end while;
END
执行存储过程
call batchInsertUserTest();
测试结果,After testing, it can be seen that the data processing is normal,The current table iscanalThe business data table in which data is monitored and written
Modify the association table structure test
ALTER TABLE `tableName`
ADD COLUMN `temp` varchar(255) NULL AFTER `status`;
After testing, it was found that the current version can be monitoredDDL的变更的,Note that there are some issues with this issue in older versions,The number of field columns in the local cache may be inconsistent with the current number of columns,when creating a new table,Looking at the log, you can see that he will put the table structure in the local cache.
Data compression and table structure design
如果使用canalRecord monitoring information,需要注意MySQLChangelog data volumes are large,Filter and discard on a case-by-case basis.在刚开始时,I didn't notice this problem,This resulted in the accumulation of millions of data within a few days,And there are some of themjson的大文本字段,As a result, the data backup program fails due to insufficient space.
1:Filtering and table filtering can be performed appropriately,Only record the required data
2:If the table only does query operations,Some large text fields can be properly compressed,Or compress the tablespace,减少占用空间
Whether the transaction rollback will still listen to the data
Considering that there is a situation where the business side is rolled back due to abnormal transactions,Therefore, the following manual transaction opening and rollback operations are performed,After the test transaction rollback does not trigger.
START TRANSACTION;
INSERT INTO ...
ROLLBACK ;
Add records through things,如果出现异常回滚,MYSQL binlogDeletion records are not logged.
单个INSERTMessage description for multiple data generation
canal1.1.5 mysqlBulk update or bulk insert,canalOnly one message is generated
insert into exam.canal_test(name) values (‘232323232’),(‘ttttttt’),(‘ooooooo’);
针对上面的情况,You need to pay attention to the use of collections when writing code,避免出现问题.
Low probability data loss problem
在浏览GitHub的issuesIt was found that some users reportedcanalThere is a chance of missing data,I haven't found this in actual use,However, there may be a low probability of data loss as described below,This needs attention during use,Compensation operations need to be done well for core business processing.
边栏推荐
猜你喜欢
单调队列模板 滑动窗口
论文速读:Homography Loss for Monocular 3D Object Detection
C - The Domino Effect(dfs+回溯)
alibaba数据同步组件canal的实践整理
OpenPCDet environment configuration of 3 d object detection and demo test
6个月测试经验,面试跳槽狮子大开口要18K,只会点点点,给我整无语了。。
力扣 215. 数组中的第K个最大元素
Unreal回放系统剖析(上)
学内核之四:关于内核与硬件的衔接
Arduino框架下STM32F1/F4系列HID模式程序烧录教程
随机推荐
C语言可以应用在哪些领域?
深度剖析-class的几个对象(utlis,component)-瀑布流-懒加载(概念,作用,原理,实现步骤)
【七夕】是时候展现专属于程序员的“浪漫”了
洛谷P2437蜜蜂路线
lvm扩容(实战无废话)
批量--09---批量读文件入表
你要的在这里,自己维护的石墨文档
找倍数(DAY 98)
批量--10---根据set数拆分文件
Sentinel熔断之非控制台方式总结
力扣 215. 数组中的第K个最大元素
关于地图GIS的一次实践整理(下) Redis的GIS实践
力扣练习——33 原子的数量
JDBC再回顾
Qt处理传输协议数据时QByteArray添加多字节的使用案例
Camtasia 2022简体中文版屏幕录像和视频编辑软件
Go 语言是如何实现切片扩容的?【slice】
internship:数据库表和建立的实体类及对应的枚举类之间的联系示例
单调队列模板 滑动窗口
ADSP21489工程中LDF文件配置详解