当前位置:网站首页>canal同步Mariadb到Mysql
canal同步Mariadb到Mysql
2022-08-02 02:21:00 【莫名其妙的忧伤】
文章目录
前言
使用场景:需要监听一个数据库中数据有改变,同步到另外一个数据库中。所以尝试使用了canal
Mariadb同步到mysql。Mariadb安装在win,mysql安装在linux
一、开启Mariadb的binlog
1.检查binlog是否开启
- 执行下面命令
show variables like 'log_bin';
如果没有打开显示 O F F \color{#FF0000}{如果没有打开显示OFF} 如果没有打开显示OFF
2.开启binlog
- 修改配置文件D:\EZAccess\Server\mariadb-5.5.64-winx64\my-huge.ini,在mysqld下面添加
log-bin=mysql-bin #binlog文件名 binlog_format=ROW #选择row模式 server_id=1 #mysql实例id,不能和canal的slaveId重复
3.重启服务
- 在D:\EZAccess\Server\service_restart.bat
4.查看
- 再次执行,显示ON 即可;
show variables like 'log_bin';
二、下载canal服务
1.下载地址
2.解压
3.修改配置文件
- 修改conf/example/instance.properties
#需要改成自己的数据库信息 canal.instance.master.address=10.10.0.9:3306 #需要改成自己的数据库用户名与密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal #需要改成同步的数据库表规则,例如只是同步一下表 canal.instance.filter.regex=.*\\..* #表示同步所有 #canal.instance.filter.regex=guli_ucenter.ucenter_member #表示同步guli_ucenter中的ucenter_member表
4.启动
bin/startup.sh
5.注意
这里会有启动不成功的问题。一直在怀疑是自己的参数配置错了。检查了好几遍。后来发现必须要有 j a v a 环境 \color{#FF0000}{java环境} java环境
需要安装JDK,配置环境变量
三.客户端模块
1.引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
</dependency>
</dependencies>
2.创建监听
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class CanalClient {
//sql队列
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Resource
private DataSource dataSource;
/** * canal入库方法 */
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.44.132",
11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
try {
while (true) {
//尝试从master那边拉去数据batchSize条记录,有多少取多少
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
//当队列里面堆积的sql大于一定数值的时候就模拟执行
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
/** * 模拟执行队列里面的sql语句 */
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
System.out.println("[sql]----> " + sql);
this.execute(sql.toString());
}
}
/** * 数据处理 * * @param entrys */
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
/** * 保存更新语句 * * @param entry */
private void saveUpdateSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName()
+ " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/** * 保存删除语句 * * @param entry */
private void saveDeleteSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
//暂时只支持单一主键
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/** * 保存插入语句 * * @param entry */
private void saveInsertSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
/** * 入库 * @param sql */
public void execute(String sql) {
Connection con = null;
try {
if(null == sql) return;
con = dataSource.getConnection();
QueryRunner qr = new QueryRunner();
int row = qr.execute(con, sql);
System.out.println("update: "+ row);
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtils.closeQuietly(con);
}
}
}
3.修改启动类
@SpringBootApplication
public class CanalApplication implements CommandLineRunner {
@Resource
private CanalClient canalClient;
public static void main(String[] args) {
SpringApplication.run(CanalApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
//项目启动,执行canal客户端监听
canalClient.run();
}
}
总结
以上就完成了。当Mariadb中数据有变动,客户端会监听到,通过sql插入需要同步的数据库。
边栏推荐
- LeetCode brushing diary: 53, the largest sub-array and
- "NetEase Internship" Weekly Diary (3)
- Moonbeam and Project integration of the Galaxy, bring brand-new user experience for the community
- 通用客户端架构
- 使用docker安装mysql
- [ORB_SLAM2] void Frame::ComputeImageBounds(const cv::Mat & imLeft)
- to-be-read list
- Win Go development kit installation configuration, GoLand configuration
- Redis for distributed applications in Golang
- ¶ Backtop back to the top is not effective
猜你喜欢
Reflex WMS Intermediate Series 7: What should I do if I want to cancel the picking of an HD that has finished picking but has not yet been loaded?
Hiring a WordPress Developer: 4 Practical Ways
十字光标太小怎么调节、CAD梦想画图算量技巧
oracle query scan full table and walk index
[LeetCode Daily Question] - 103. Zigzag Level Order Traversal of Binary Tree
Project Background Technology Express
LeetCode Brushing Diary: 74. Searching 2D Matrix
2022-07-30 mysql8 executes slow SQL-Q17 analysis
使用docker安装mysql
Ringtone 1161. Maximum In-Layer Elements and
随机推荐
BI - SQL 丨 WHILE
Rasa 3.x 学习系列- Rasa - Issues 4873 dispatcher.utter_message 学习笔记
"NetEase Internship" Weekly Diary (2)
TKU remembers a single-point QPS optimization (I wish ITEYE is finally back)
Power button 1374. Generate each character string is an odd number
NIO's Sword
53. 最小的k个数
永磁同步电机36问(二)——机械量与电物理量如何转化?
MySQL8 download, start, configure, verify
Centos7 安装postgresql并开启远程访问
Use DBeaver for mysql data backup and recovery
LeetCode brushing diary: 33. Search and rotate sorted array
Win Go development kit installation configuration, GoLand configuration
2022-07-30 mysql8 executes slow SQL-Q17 analysis
Handwriting a blogging platform ~ Day 3
Reflex WMS Intermediate Series 7: What should I do if I want to cancel the picking of an HD that has finished picking but has not yet been loaded?
Software testing Interface automation testing Pytest framework encapsulates requests library Encapsulates unified request and multiple base path processing Interface association encapsulation Test cas
拼多多借力消博会推动国内农产品品牌升级 看齐国际精品农货
Electronic Manufacturing Warehouse Barcode Management System Solution
to-be-read list