当前位置:网站首页>canal同步Mariadb到Mysql
canal同步Mariadb到Mysql
2022-08-02 02:21:00 【莫名其妙的忧伤】
文章目录
前言
使用场景:需要监听一个数据库中数据有改变,同步到另外一个数据库中。所以尝试使用了canal
Mariadb同步到mysql。Mariadb安装在win,mysql安装在linux
一、开启Mariadb的binlog
1.检查binlog是否开启
- 执行下面命令
show variables like 'log_bin';

如果没有打开显示 O F F \color{#FF0000}{如果没有打开显示OFF} 如果没有打开显示OFF
2.开启binlog
- 修改配置文件D:\EZAccess\Server\mariadb-5.5.64-winx64\my-huge.ini,在mysqld下面添加
log-bin=mysql-bin #binlog文件名 binlog_format=ROW #选择row模式 server_id=1 #mysql实例id,不能和canal的slaveId重复
3.重启服务
- 在D:\EZAccess\Server\service_restart.bat
4.查看
- 再次执行,显示ON 即可;
show variables like 'log_bin';
二、下载canal服务
1.下载地址
2.解压
3.修改配置文件
- 修改conf/example/instance.properties
#需要改成自己的数据库信息 canal.instance.master.address=10.10.0.9:3306 #需要改成自己的数据库用户名与密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal #需要改成同步的数据库表规则,例如只是同步一下表 canal.instance.filter.regex=.*\\..* #表示同步所有 #canal.instance.filter.regex=guli_ucenter.ucenter_member #表示同步guli_ucenter中的ucenter_member表
4.启动
bin/startup.sh
5.注意
这里会有启动不成功的问题。一直在怀疑是自己的参数配置错了。检查了好几遍。后来发现必须要有 j a v a 环境 \color{#FF0000}{java环境} java环境
需要安装JDK,配置环境变量
三.客户端模块
1.引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
</dependency>
</dependencies>
2.创建监听
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class CanalClient {
//sql队列
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Resource
private DataSource dataSource;
/** * canal入库方法 */
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.44.132",
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
try {
while (true) {
//尝试从master那边拉去数据batchSize条记录,有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
//当队列里面堆积的sql大于一定数值的时候就模拟执行
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/** * 模拟执行队列里面的sql语句 */
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
System.out.println("[sql]----> " + sql);
this.execute(sql.toString());
}
}
/** * 数据处理 * * @param entrys */
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/** * 保存更新语句 * * @param entry */
private void saveUpdateSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
+ " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/** * 保存删除语句 * * @param entry */
private void saveDeleteSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/** * 保存插入语句 * * @param entry */
private void saveInsertSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/** * 入库 * @param sql */
public void execute(String sql) {
Connection con = null;
try {
if(null == sql) return;
con = dataSource.getConnection();
QueryRunner qr = new QueryRunner();
int row = qr.execute(con, sql);
System.out.println("update: "+ row);
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtils.closeQuietly(con);
}
}
}
3.修改启动类
@SpringBootApplication
public class CanalApplication implements CommandLineRunner {
@Resource
private CanalClient canalClient;
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
//项目启动,执行canal客户端监听
canalClient.run();
}
}
总结
以上就完成了。当Mariadb中数据有变动,客户端会监听到,通过sql插入需要同步的数据库。
边栏推荐
猜你喜欢

Nanoprobes纳米探针丨Nanogold偶联物的特点和应用

A good book for newcomers to the workplace

Pinduoduo leverages the consumer expo to promote the upgrading of domestic agricultural products brands and keep pace with international high-quality agricultural products

机器人领域期刊会议汇总

Handwriting a blogging platform ~ Day 3

Nanoprobes免疫测定丨FluoroNanogold试剂免疫染色方案

FOFAHUB usage test

The underlying data structure of Redis

51. 数字排列
![[LeetCode Daily Question] - 103. Zigzag Level Order Traversal of Binary Tree](/img/b9/35813ae2972375fa728e3c11fab5d3.png)
[LeetCode Daily Question] - 103. Zigzag Level Order Traversal of Binary Tree
随机推荐
GTK RGB图像绘制
Nanoprobes Polyhistidine (His-) Tag: Recombinant Protein Detection Protocol
2022-08-01 安装mysql监控工具phhMyAdmin
Multi-Party Threshold Private Set Intersection with Sublinear Communication-2021: Interpretation
CodeTon Round 2 D. Magical Array 规律
Redis 底层的数据结构
Data transfer at the data link layer
swift project, sqlcipher3 -> 4, cannot open legacy database is there a way to fix it
Ringtone 1161. Maximum In-Layer Elements and
2022 NPDP take an examination of how the results?How to query?
Ask God to answer, how should this kind of sql be written?
极大似然估计
AI target segmentation capability for fast video cutout without green screen
记一次gorm事务及调试解决mysql死锁
Remember a gorm transaction and debug to solve mysql deadlock
MySQL8 download, start, configure, verify
AntPathMatcher uses
2022-08-01 mysql/stoonedb慢SQL-Q18分析
Centos7 安装postgresql并开启远程访问
nacos startup error, the database has been configured, stand-alone startup