当前位置:网站首页>canal同步Mariadb到Mysql
canal同步Mariadb到Mysql
2022-08-02 02:21:00 【莫名其妙的忧伤】
文章目录
前言
使用场景:需要监听一个数据库中数据有改变,同步到另外一个数据库中。所以尝试使用了canal
Mariadb同步到mysql。Mariadb安装在win,mysql安装在linux
一、开启Mariadb的binlog
1.检查binlog是否开启
- 执行下面命令
show variables like 'log_bin';

如果没有打开显示 O F F \color{#FF0000}{如果没有打开显示OFF} 如果没有打开显示OFF
2.开启binlog
- 修改配置文件D:\EZAccess\Server\mariadb-5.5.64-winx64\my-huge.ini,在mysqld下面添加
log-bin=mysql-bin #binlog文件名 binlog_format=ROW #选择row模式 server_id=1 #mysql实例id,不能和canal的slaveId重复
3.重启服务
- 在D:\EZAccess\Server\service_restart.bat
4.查看
- 再次执行,显示ON 即可;
show variables like 'log_bin';
二、下载canal服务
1.下载地址
2.解压
3.修改配置文件
- 修改conf/example/instance.properties
#需要改成自己的数据库信息 canal.instance.master.address=10.10.0.9:3306 #需要改成自己的数据库用户名与密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal #需要改成同步的数据库表规则,例如只是同步一下表 canal.instance.filter.regex=.*\\..* #表示同步所有 #canal.instance.filter.regex=guli_ucenter.ucenter_member #表示同步guli_ucenter中的ucenter_member表
4.启动
bin/startup.sh
5.注意
这里会有启动不成功的问题。一直在怀疑是自己的参数配置错了。检查了好几遍。后来发现必须要有 j a v a 环境 \color{#FF0000}{java环境} java环境
需要安装JDK,配置环境变量
三.客户端模块
1.引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
</dependency>
</dependencies>
2.创建监听
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class CanalClient {
//sql队列
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Resource
private DataSource dataSource;
/** * canal入库方法 */
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.44.132",
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
try {
while (true) {
//尝试从master那边拉去数据batchSize条记录,有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
//当队列里面堆积的sql大于一定数值的时候就模拟执行
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/** * 模拟执行队列里面的sql语句 */
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
System.out.println("[sql]----> " + sql);
this.execute(sql.toString());
}
}
/** * 数据处理 * * @param entrys */
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/** * 保存更新语句 * * @param entry */
private void saveUpdateSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
+ " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/** * 保存删除语句 * * @param entry */
private void saveDeleteSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/** * 保存插入语句 * * @param entry */
private void saveInsertSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/** * 入库 * @param sql */
public void execute(String sql) {
Connection con = null;
try {
if(null == sql) return;
con = dataSource.getConnection();
QueryRunner qr = new QueryRunner();
int row = qr.execute(con, sql);
System.out.println("update: "+ row);
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtils.closeQuietly(con);
}
}
}
3.修改启动类
@SpringBootApplication
public class CanalApplication implements CommandLineRunner {
@Resource
private CanalClient canalClient;
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
//项目启动,执行canal客户端监听
canalClient.run();
}
}
总结
以上就完成了。当Mariadb中数据有变动,客户端会监听到,通过sql插入需要同步的数据库。
边栏推荐
- Ask God to answer, how should this kind of sql be written?
- How engineers treat open source
- LeetCode Brushing Diary: 74. Searching 2D Matrix
- nacos启动报错,已配置数据库,单机启动
- Reflex WMS Intermediate Series 7: What should I do if I want to cancel the picking of an HD that has finished picking but has not yet been loaded?
- 【LeetCode每日一题】——654.最大二叉树
- 【LeetCode每日一题】——704.二分查找
- LeetCode brush diary: LCP 03. Machine's adventure
- Service discovery of kubernetes
- 列表常用方法
猜你喜欢

Good News | AR opens a new model for the textile industry, and ALVA Systems wins another award!

Nanoprobes丨1-mercapto-(triethylene glycol) methyl ether functionalized gold nanoparticles

Hash collisions and consistent hashing

Personal blog system project test

项目后台技术Express

记一次gorm事务及调试解决mysql死锁

机器人领域期刊会议汇总

FOFAHUB使用测试

Oracle19c安装图文教程

"NetEase Internship" Weekly Diary (2)
随机推荐
Outsourcing worked for three years, it was abolished...
oracle query scan full table and walk index
[Server data recovery] Data recovery case of server Raid5 array mdisk disk offline
【LeetCode Daily Question】——704. Binary Search
"NetEase Internship" Weekly Diary (1)
CodeTon Round 2 D. Magical Array
软件测试 接口自动化测试 pytest框架封装 requests库 封装统一请求和多个基础路径处理 接口关联封装 测试用例写在yaml文件中 数据热加载(动态参数) 断言
MySQL - CRUD operations
Electronic Manufacturing Warehouse Barcode Management System Solution
[ORB_SLAM2] void Frame::ComputeImageBounds(const cv::Mat & imLeft)
通用客户端架构
Personal blog system project test
Redis Subscription and Redis Stream
Can Youxuan database import wrongly be restored?
LeetCode brush diary: LCP 03. Machine's adventure
A good book for newcomers to the workplace
CASE2023
【web】Understanding Cookie and Session Mechanism
列表常用方法
Scheduled tasks for distributed applications in Golang