当前位置:网站首页>Touch hands to realize canal how to access Mysql to realize data write operation monitoring
Touch hands to realize canal how to access Mysql to realize data write operation monitoring
2022-07-28 15:08:00 【Hua Weiyun】
author : On knowledge ,CSDN Contracted lecturer ,CSDN Force author , High quality creators in the back-end field , Love to share and create
official account : On knowledge
Be good at the field : Full stack engineer 、 Reptiles 、ACM Algorithm
Contact information vx:zsqtcc
Canal The use of is divided into two parts :
- The upper part is canal Access Mysql Database operation monitoring .

- The next part is Canal How to access message queue 、Redis、Canal High availability HA.
This time, take some to make the main dish
Environmental Science :linux
Database host (192.168.31.230)
Canal server host (192.168.31.231)
install MySQL 8.0.28(192.168.31.230)
- Firewall release 3306 port
firewall‐cmd ‐‐zone=public ‐‐add‐port=3306/tcp ‐‐permanentfirewall‐cmd ‐‐reload - install mysql Factory warehouse ( Note that mysql Factory warehouse , No mysql)
cd /home/wget ‐‐no‐check‐certificate https://manongbiji.oss‐cn‐beijing.aliyuncs.com/ittailkshow/canal/download/world.sqlwget ‐‐no‐check‐certificate https://repo.mysql.com/mysql80‐community‐release‐el7‐5.noarch.rpmyum localinstall ‐y mysql80‐community‐release‐el7‐5.noarch.rpm - install Mysql
# Automatic installation MySQL 8.0.28yum install ‐y mysql‐community‐server - Adjust the configuration file
sudo cat >> /etc/my.cnf <<‐'EOF'server‐id=1log‐bin=mysql‐binbinlog_format=rowbinlog‐do‐db=worldEOFsystemctl start mysqld - Get the initial password
grep 'temporary password' /var/log/mysqld.log2022-03-31T04:20:25.133810Z 6 [Note] [MY-010454] [Server] A temporary password isgenerated for [email protected]: Jby&XTOc.7iNmysql ‐uroot ‐pJby&XTOc.7iN- mysql Command execution after connection
# modify root password ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY 'asAS123456!';#MySQL Reduce password strength set global validate_password.policy=0;set global validate_password.length=4;# establish canal Sync accounts CREATE USER [email protected]'%' IDENTIFIED with mysql_native_password BY 'canal';# to grant authorization canal Users are allowed to remote to mysql Implement master-slave replication GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';CREATE USER [email protected]'%' IDENTIFIED with mysql_native_password BY 'remote';grant all privileges on *.* to [email protected]'%';# Initialize database source /home/world.sql
install Canal-Server
- install JDK
yum ‐y install java‐1.8.0‐openjdk‐devel.x86_64sudo cat >> /etc/profile <<‐'EOF'export JAVA_HOME=/usr/lib/jvm/java‐1.8.0‐openjdkexport JRE_HOME=$JAVA_HOME/jreexport CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATHexport PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATHEOFsource /etc/profileecho $JAVA_HOME - download canal-deployer The latest version
# Download script wget ‐‐no‐check‐certificate https://manongbiji.oss‐cn‐beijing.aliyuncs.com/ittailkshow/canal/download/canal.deployer‐1.1.5.tar.gzmkdir /home/canaltar zxvf canal.deployer‐1.1.5.tar.gz ‐C /home/canal - modify canal The configuration file
vi conf/example/instance.properties# adjustment serverIdcanal.instance.mysql.slaveId=10# master Address canal.instance.master.address=192.168.31.230:3306...# close tsdbcanal.instance.tsdb.enable=false# confirm canal User name for synchronization 、 password canal.instance.dbUsername=canalcanal.instance.dbPassword=canal - canal The service starts and releases the relevant ports
# Start the service sh bin/startup.sh#canal admin Port development data listener pom.xml Write a data listener firewall‐cmd ‐‐zone=public ‐‐add‐port=11110/tcp ‐‐permanent#canal Listening port firewall‐cmd ‐‐zone=public ‐‐add‐port=11111/tcp ‐‐permanent#canal Indicator monitoring port firewall‐cmd ‐‐zone=public ‐‐add‐port=11112/tcp ‐‐permanentfirewall‐cmd ‐‐reload - see canal journal
tail canal.log
Develop data monitoring program
pom.xml rely on
<dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.5</version> </dependency></dependencies>Write a data listener
package com.itlaoqi;import com.alibaba.fastjson.JSONObject;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import com.google.protobuf.ByteString;import com.google.protobuf.InvalidProtocolBufferException;import java.net.InetSocketAddress;import java.util.List;public class AD { public static void main(String[] args) throws InterruptedException, Invalid ProtocolBufferException { //TODO Get the connection CanalConnector canalConnector = CanalConnectors.newSingleConnector(new Inet SocketAddress("192.168.31.231", 11111), "example", "", ""); while (true) { //TODO Connect canalConnector.connect(); //TODO Subscribe to the database canalConnector.subscribe("world.*"); //TODO get data Message message = canalConnector.get(100); //TODO obtain Entry aggregate List<CanalEntry.Entry> entries = message.getEntries(); //TODO Determines if the set is empty , If it is empty , Then wait for a while to continue pulling data if (entries.size() <= 0) { System.out.println(" There is no data in this crawl , Take a break ......"); Thread.sleep(1000); } else { //TODO Traverse entries, Single parsing for (CanalEntry.Entry entry : entries) { //1. Get table name String tableName = entry.getHeader().getTableName(); //2. Access to type CanalEntry.EntryType entryType = entry.getEntryType(); //3. Get serialized data ByteString storeValue = entry.getStoreValue(); //4. Judge the present entryType Is the type ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { //5. Deserialized data CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //6. Get the operation type of the current event CanalEntry.EventType eventType = rowChange.getEventType(); //7. Get data set List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList(); //8. Traverse rowDataList, And print the data set for (CanalEntry.RowData rowData : rowDataList) { JSONObject beforeData = new JSONObject(); List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } // The data to print System.out.println("Table:" + tableName + ",EventType:" + eventType + ",Before:" + beforeData + ",After:" + afterData); } } else { System.out.println(" The current operation type is :" + entryType); } } } } }}== Liangwen recommends ==
Distributed ID Common solutions for - Take it down
Mysql Master-slave synchronization and master-slave synchronization delay solution
边栏推荐
- Find papers and their open source code
- View gnuradio version
- Matlab load usage
- A problem -- about dragging div in JS, when I change div to round, there will be a bug
- 企业微信客服链接,企业微信客服聊天
- iframe 标签
- Keras reported an error using tensorboard: cannot stop profiling
- Hard disk partition method
- Process finished with exit code-1073740791(0xC0000409)
- Why can the anonymous functions of JQ access the methods inside
猜你喜欢

Analysis vulnerability introduction

Image steganography method

Compilation language and interpretation language

MySQL authorization method

Compose learning notes 2 - launchedeffect, status and status management

Compose learning notes 1-compose, state, flow, remember

PHP memory horse

Redis-Redis在Jedis中的使用

Establishment and traversal of binary tree (implemented in C language)
Robot mathematics foundation 3D space position representation space position
随机推荐
When MySQL uses left join to query tables, the query is slow because the connection conditions are not properly guided
Mlx90640 infrared thermal imager sensor module development notes (VIII)
16、 Launch file label of ROS (II)
Introduction to mqtt protocol
C language: mathematical method of converting decimal system into binary system
PS how to crop photos
ssh服务
On July 29, apachecon | apachepulsar's exploration and practice in vivo will be broadcast soon
PHP magic method
19、 ROS parameter name setting
&0xffffffff(0x08)
Instructions for common symbols in kotlin
多商户商城系统功能拆解17讲-平台端订单列表
Multi merchant mall system function disassembly lecture 17 - platform side order list
C language program: judging triangles
Downloading PIP package is too slow
iframe 标签
UTF-8、UTF-16 和 UTF-32 字符编码之间的区别?[图文详解]
35道MySQL面试必问题图解,小白都能看懂
Redis redis use in jedis
