当前位置:网站首页>Docker installs canal and MySQL for simple testing and implementation of redis and MySQL cache consistency
Docker installs canal and MySQL for simple testing and implementation of redis and MySQL cache consistency
2022-07-02 03:07:00 【Xiao Wang with hair loss】
One 、 brief introduction
canal
[kə’næl], Waterways / The Conduit / Ditch , The main use is based on MySQL Database incremental log parsing
, Provide Incremental data subscription and consumption
.
In the early days, Alibaba was deployed due to the dual computer rooms in Hangzhou and the United States , There is a business demand for synchronization across computer rooms , The implementation is mainly based on the business trigger Get incremental changes . from 2010 Year begins , The business gradually tries to get incremental changes from database log parsing for synchronization , Thus, a large number of incremental database subscription and consumption businesses are derived .
Canal Yes, it is Java Developed incremental log parsing based on database , Provide Incremental data subscription & Consumer Middleware
.
at present ,Canal It mainly supports MySQL Of Binlog
analysis , Only after the parsing is completed can Canal Client To deal with getting
Relevant data .( Database synchronization requires Alibaba's Otter
middleware , be based on Canal).
Current canal Support source side MySQL The version includes 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
.
Two 、MySQL Of Binlog
1. Binlog Introduce
MySQL The binary log of can be said MySQL The most important log , It records everything DDL and DML( except
The data query statement ) sentence , Record as an event , It also contains the time consumed by statement execution ,MySQL Of Binary Log making
It's transactional security .
Generally speaking, there will be 1% Loss of performance . Binary has two most important usage scenarios :
MySQL Replication stay Master End open Binlog,Master Pass its binary log to Slaves
In order to achieve Master-Slave Data consistent purpose , This is our common master-slave replication .Data recovery , By using MySQL Binlog Tools to recover data , Start production , Otherwise, you really have to delete the database and run away .
2. Binlog The classification of
MySQL Binlog There are three formats for , Namely STATEMENT,MIXED,ROW. You can select configuration in the configuration file
Set up binlog_format= statement|mixed|row.
- statement: Sentence level ,binlog The statement that performs a write operation each time is recorded . such as
update user set create_date=now()
advantage : Save a space .
shortcoming : It may cause data inconsistency . - row: Row level , binlog It records the changes of each line after each operation .
advantage : Keep the data absolutely consistent
shortcoming : Take up a lot of space - mixed:statement Upgraded version , To some extent, it solved , Because of some circumstances statement
Pattern inconsistencies , Acquiescence or statement, Some will produce inconsistencies or choose row.
Comprehensive comparison
Canal Want to do monitoring analysis , choice row The format is more appropriate .
3、 ... and 、 working principle
1. MySQL Principle of active / standby replication
- MySQL master Write data changes to binary log ( binary log, The records are called binary log events binary log events, Can pass show binlog events To view the )
- MySQL slave take master Of binary log events Copy to its trunk log (relay log)
- MySQL slave replay relay log Middle event , Change the data to reflect its own data
2. canal working principle
- canal simulation MySQL slave Interaction protocol , Pretend to be MySQL slave , towards MySQL master send out dump agreement
- MySQL master received dump request , Start pushing binary log to slave ( namely canal )
- canal analysis binary log object ( Originally byte flow )
summary :
We can canal Understood as slave , Get the data and follow up , It can be synchronized to redis On , There is no need to delay double deletion to ensure mysql and redis The data consistency of , And there won't be all kinds of problems !
Four 、canal Use scenarios
Scene one : Ali Otter Part of middleware
Otter It is a synchronization framework used by Alibaba for remote databases ,Canal It's part of it .
otter github Address
Scene two : Ensure cache and database consistency ( What we are going to test today )
Scene three : Real time data analysis
Grab the new change data of the business table , Used to make real-time statistics
5、 ... and 、 install mysql、redis
1. install mysql
sudo docker run -p 3306:3306 --name mysql \
-v /mydata/mysql/log:/var/log/mysql \
-v /mydata/mysql/data:/var/lib/mysql \
-v /mydata/mysql/conf:/etc/mysql \
-e MYSQL_ROOT_PASSWORD=root \
-d mysql:5.7
2. Docker To configure MySQL
vim /mydata/mysql/conf/my.cnf # Create and go to edit
Add the following configuration :
[client]
default-character-set=utf8
[mysql]
default-character-set=utf8
[mysqld]
init_connect='SET collation_connection = utf8_unicode_ci'
init_connect='SET NAMES utf8'
character-set-server=utf8
collation-server=utf8_unicode_ci
skip-character-set-client-handshake
skip-name-resolve
# Turn on binlog journal : The catalogue is docker The list in
log-bin=/var/lib/mysql/mysql-bin
# server_id Need to guarantee uniqueness , Unable to join canal Of slaveId repeat
server-id=123456
binlog_format=row
# test Database open , If it is not set, all libraries will be opened
binlog-do-db=test
3. Restart mysql
docker restart mysql
4. Create users and assign permissions
see mysql Of id:
docker ps
Get into docker Containers :
docker exec -it 7d /bin/bash
Connect to mysql:
mysql -u root -p
Create users and give them permissions :
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;
Refresh :
flush privileges;
5. Win10 Connect mysql establish user surface
CREATE TABLE `user` (
`id` int(10) NOT NULL AUTO_INCREMENT,
`name` varchar(25) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`sex` varchar(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 8 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
6. establish redis
docker run -p 6379:6379 --name redis \
-v /mydata/redis/data:/data \
-v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf \
-d redis redis-server /etc/redis/redis.conf
6、 ... and 、 install canal
1. Start the container
docker run -it --name canal -p 11111:11111 -d canal/canal-server:v1.1.5
Look at three containers :
docker ps
2. To configure canal
Into the container :
docker exec -it 56 /bin/bash
Toggle directory :
cd canal-server/conf/example
Modify two places :
The first is mysql The address of , The second is the name of the database we created ( You can use the default band , That is, all libraries are collected binlog journal )
canal.instance.master.address=192.168.84.138:3306
canal.instance.filter.regex=test\..*
3. Check the log
So let's see canal Log , See if it's started successfully !
First into the container :
docker exec -it 56 /bin/bash
Toggle directory :
cd canal-server/logs/example/
Check the log :
cat example.log
No error reported , The newly created table can also be detected here !
4. see canal.properties
cd /canal-server/conf
cat canal.properties
We can see that there are many patterns , You can put canal Collected binlog Send to three MQ in , perhaps tcp.
This time with tcp Standard test , If you have any requirements, you can send them to MQ, Sliding down has corresponding configuration !
7、 ... and 、 A simple test
1. newly build springboot project , Import dependence
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>2.8.6</version>
</dependency>
2. Write test files
From official examples :
I put statis Keyword deleted , Convenience and redis Integration
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/** * @author wangzhenjun * @date 2022/6/29 9:31 */
@Configuration
public class SimpleCanalClientExample {
// private static String REDIS_DATABASE = "mall";
// private static String REDIS_KEY_ADMIN = "ums:admin";
@Bean
public void canalSync() {
// Create links , The first parameter is canal Of ip, The second parameter is canal Port number ,
// The third parameter is canal Virtual module name ,canal Is the created database account password
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.84.138",
11111), "example", "canal", "canal");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
// The corresponding configuration above is only for test Library binlog file
connector.subscribe("test\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // Get a specified amount of data
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // Submit confirmation
// connector.rollback(batchId); // Processing failed , Undo Data
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
3. Start project
4. Insert a piece of data
INSERT INTO user VALUES (1,' Xiaohong ',' Woman ');
summary :
We can get binlog The log , Now let's enter the actual battle : Realization redis Cache synchronization
8、 ... and 、 actual combat redis Synchronous cache
1. To write redis Serialization configuration class
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/** * @author wangzhenjun * @date 2022/6/30 9:24 */
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
Jackson2JsonRedisSerializer<?> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
2. add to redis Methods of addition, deletion and modification
Mainly added synchronization to redis Two approaches , Here is 2 Minutes will stop listening , You can adjust according to your own :
int totalEmptyCount = 120;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
/** * @author wangzhenjun * @date 2022/6/29 9:31 */
@Configuration
public class SimpleCanalClientExample {
@Autowired
private RedisTemplate redisTemplate;
private static final String KEY = "user:info";
@Bean
public void canalSync() {
// Create links , The first parameter is canal Of ip, The second parameter is canal Port number ,
// The third parameter is canal Virtual module name ,canal Is the created database account password
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.84.138",
11111), "example", "canal", "canal");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
// The corresponding configuration above is only for test Library binlog file
connector.subscribe("test\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // Get a specified amount of data
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // Submit confirmation
// connector.rollback(batchId); // Processing failed , Undo Data
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
// Synchronize to redis
delete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
// Synchronize to redis
insertOrUpdate(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
// Synchronize to redis
insertOrUpdate(rowData.getAfterColumnsList());
}
}
}
}
private void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
/** * Update or add trigger synchronization to redis * @param columns */
private void insertOrUpdate (List<Column> columns) {
if (columns.size() > 0) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
redisTemplate.opsForHash().put(KEY,columns.get(0).getValue(),json.toJSONString());
}
}
/** * Delete trigger sync to redis * @param columns */
private void delete (List<Column> columns) {
if (columns.size() > 0) {
redisTemplate.opsForHash().delete(KEY, columns.get(0).getValue());
}
}
}
3. Test add
Insert a database :
insert into user values (1,' I am testing add ',' male ');
The console captures information :
We see redis There's already data , Synchronous success !
4. Test update
Let's take a closer look at the data we just added :
update user set name = ' Revised ' where id = 1;
The console captured the updated information :
redis Also changed synchronously !
5. Test to delete
Let's add a few more first :
Delete id by 1 The data of :
delete from user where id = 1;
The console captured the deletion information :
redis Also deleted synchronously !
Nine 、 summary
In this way, a canal Application scenarios of , Of course, you can binlog Data sent to MQ Come on !
Xiao Bian has been tidying up for a day , See here, give Xiaobian some attention , Thank you for your support !!
Those who are destined can see !!!
Click on the access ! Make up your own website , There are also many good articles in it !
边栏推荐
- Mmsegmentation series training and reasoning their own data set (3)
- Formatting logic of SAP ui5 currency amount display
- ZABBIX API creates hosts in batches according to the host information in Excel files
- 竞争与冒险 毛刺
- [JS reverse series] analysis of a customs publicity platform
- What is the difference between an intermediate human resource manager and an intermediate economist (human resources direction)?
- Mongodb base de données non relationnelle
- /silicosis/geo/GSE184854_scRNA-seq_mouse_lung_ccr2/GSE184854_RAW/GSM5598265_matrix_inflection_demult
- Principle of computer composition - interview questions for postgraduate entrance examination (review outline, key points and reference)
- Share the basic knowledge of a common Hongmeng application
猜你喜欢
Batch detect whether there is CDN in URL - high accuracy
OSPF LSA message parsing (under update)
Soul app released the annual report on generation Z behavior: nearly 20% of young people love shopping in the vegetable market
buu_ re_ crackMe
Design details of SAP e-commerce cloud footernavigationcomponent
QT实现界面跳转
MMSegmentation系列之训练与推理自己的数据集(三)
New programmer magazine | Li Penghui talks about open source cloud native message flow system
只需简单几步 - 开始玩耍微信小程序
Verilog 过程赋值 区别 详解
随机推荐
2022 hoisting machinery command examination paper and summary of hoisting machinery command examination
Remote connection to MySQL under windows and Linux system
Qualcomm platform WiFi -- Native crash caused by WiFi
Actual battle of financial risk control - under Feature Engineering
Stack - es - official documents - filter search results
What are the common proxy servers and what are the differences?
After marriage
Mongodb base de données non relationnelle
Connected block template and variants (4 questions in total)
Find duplicates [Abstract binary / fast and slow pointer / binary enumeration]
[JS reverse series] analysis of a customs publicity platform
Which brand of running headphones is good? How many professional running headphones are recommended
PHP notes - use Smarty to set public pages (include, if, else, variable settings)
Jvm-01 (phased learning)
[untitled]
Tupu software has passed CMMI5 certification| High authority and high-level certification in the international software field
[staff] pitch representation (bass clef | C1 36 note pitch representation | C2 48 note pitch representation | C3 60 note pitch representation)
STM32__ 05 - PWM controlled DC motor
Soul app released the annual report on generation Z behavior: nearly 20% of young people love shopping in the vegetable market
2022-2028 global human internal visualization system industry research and trend analysis report