当前位置:网站首页>Alibaba-Canal使用详解(排坑版)_MySQL与ES数据同步
Alibaba-Canal使用详解(排坑版)_MySQL与ES数据同步
2022-07-06 01:21:00 【shstart7】
canal概述
用处
canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。
工作原理
canal会模拟MySQL主库和从库的交互协议,从而伪装成MySQL的从库,然后向MySQL主库发送dump协议,MySQL主库收到dump请求会向canal推送binlog,canal通过解析binlog将数据同步到其他存储中去。
1.版本
这里我的MySQL和ES都安装在阿里云服务器上
,MySQL版本为5.7,ES版本为7.14.
2.下载
下载地址Github,这里我下载的版本为v1.1.5-alpha-2
,下载前三个组件即可。
3.上传
由于Canal启动所需要的内存较大,我的阿里云服务器配置较低,所以这里我在本地的Liunx服务器上安装配置Canal。
将三个组件上传到Linux服务器上。
4.配置MySQL
由于canal是通过订阅MySQL的binlog来实现数据同步的,所以我们需要开启MySQL的binlog写入功能,并设置binlog-format
为ROW模式,
我的MySQL是跑在阿里云Docker中的,并且做了挂载,所以修改外部的my.cnf文件即可,加入下面的配置即可
[mysqld]
## 设置server_id,同一局域网中需要唯一
server_id=101
## 指定不需要同步的数据库名称
binlog-ignore-db=mysql
## 开启二进制日志功能
log-bin=mall-mysql-bin
## 设置二进制日志使用内存大小(事务)
binlog_cache_size=1M
## 设置使用的二进制日志格式(mixed,statement,row)
binlog_format=row
## 二进制日志过期清理时间。默认值为0,表示不自动清理。
expire_logs_days=7
## 跳过主从复制中遇到的所有错误或指定类型的错误,避免slave端复制中断。
## 如:1062错误是指一些主键重复,1032错误是因为主从数据库数据不一致
slave_skip_errors=1062
配置完成后需要重启MySQL,重启后使用如下命令查看binlog是否启用
命令
# 查看binlog是否启用
show variables like '%log_bin%';
# 查看下MySQL的binlog模式;
show variables like 'binlog_format%';
接下来需要创建一个拥有从库权限的账号,用于订阅binlog,这里创建的账号为
canal:canal
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
创建好测试使用的数据库bookstore,之后创建一张表book
,
create table book(
id int(32) auto_increment primary key,
book_name varchar(32),
price double,
introduce varchar(2048),
press varchar(32),
author varchar(32),
publish_date DATE
)
5.配置canal-server
将我们上传的canal.deployer-1.1.5-SNAPSHOT.tar.gz
解压到指定目录
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /指定路径
解压后的目录
├── bin
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── canal_local.properties
│ ├── canal.properties
│ └── example
│ └── instance.properties
├── lib
├── logs
│ ├── canal
│ │ └── canal.log
│ └── example
│ ├── example.log
│ └── example.log
└── plugin
修改配置
修改配置文件conf/example/instance.properties
,按如下配置即可,主要是修改数据库相关配置;
# 需要同步数据的MySQL地址
canal.instance.master.address=ip地址:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*
修改运行参数
由于canal默认的运行需要的内存较大,这里我们修改一下(这一步非常重要)
修改bin目录下的startup.sh,这里我都改成了512M
启动
- 进入bin目录,启动startup.sh
sh bin/startup.sh
6.配置canal-adapter
跟上面一样,先进行解压安装。解压后的目录如下.
├── bin
│ ├── adapter.pid
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
│ ├── es6
│ ├── es7
│ │ ├── biz_order.yml
│ │ ├── customer.yml
│ │ └── product.yml
│ ├── hbase
│ ├── kudu
│ ├── logback.xml
│ ├── META-INF
│ │ └── spring.factories
│ └── rdb
├── lib
├── logs
│ └── adapter
│ └── adapter.log
└── plugin
修改配置文件
conf/application.yml
,按如下配置即可,主要是修改canal-server配置、数据源配置和客户端适配器配置;
注意: Linux要开启11111端口
canal.conf:
mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
zookeeperHosts: # 对应集群模式下的zk地址
syncBatchSize: 1000 # 每次同步的批数量
retries: 0 # 重试次数, -1为无限重试
timeout: # 同步超时时间, 单位毫秒
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources: # 源数据库配置
defaultDS:
url: jdbc:mysql://自己MySQL的ip地址:3306/bookstore?useUnicode=true
username: canal
password: canal
canalAdapters: # 适配器列表
- instance: example # canal实例名或者MQ topic名
groups: # 分组列表
- groupId: g1 # 分组id, 如果是MQ模式将用到该值
outerAdapters:
- name: logger # 日志打印适配器
- name: es7 # ES同步适配器
hosts: 自己ES的ip地址:9200 # ES连接地址
properties:
mode: rest # 模式可选transport(9300) 或者 rest(9200)
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch # ES集群名称
添加映射
进入canal-adapter/conf/es7目录,创建一个book.yml
,配置MySQL中的表与Elasticsearch中索引的映射关系
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: book # es 的索引名称
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
sql: "SELECT
b.id AS _id, #这里是为了将MySQL的id对应上ES的_id
b.id, #这里是为了将MySQL的id对应ES文档中的id属性
b.book_name,
b.price,
b.introduce,
b.press,
b.author,
b.publish_date
FROM
book b" # sql映射
etlCondition: "where a.c_time>={}" #etl的条件参数
commitBatch: 3000 # 提交批大小
修改运行参数
到canal_adapter/bin
目录下,修改startup.sh文件,我这里跟上面一样都设置成了512M
然后运行startup.sh,进行启动
7.在ES中创建索引
PUT /book
{
"mappings" : {
"properties" : {
"author" : {
"type" : "keyword"
},
"book_name" : {
"type" : "text",
"analyzer": "ik_max_word" //使用分词器
},
"id" : {
"type" : "integer"
},
"introduce" : {
"type" : "text",
"analyzer": "ik_max_word" //使用分词器
},
"press" : {
"type" : "keyword"
},
"price" : {
"type" : "double"
},
"publish_date" : {
"type" : "date"
}
}
}
}
查看索引
8.测试
使用SQL语句在数据库中创建一条记录
insert into book values(1, "《Java》", 2323.2,"Java从入门到放弃", "机械工业出版社", "James","2012-12-11")
插入成功后,我们在ES中搜索一下,发现数据已经同步.
测试一下修改
update book set book_name = "《Python》" where id = 1
再次搜索,发现book_name已经同步.
测试一下删除
delete from book where id = 1
同步成功
9.canal-admin使用
解压canal.admin-1.1.5-SNAPSHOT.tar.gz
,解压后的目录如下
├── bin
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
│ ├── canal_manager.sql
│ ├── canal-template.properties
│ ├── instance-template.properties
│ ├── logback.xml
│ └── public
│ ├── avatar.gif
│ ├── index.html
│ ├── logo.png
│ └── static
├── lib
└── logs
执行文件
将conf/canal_manager.sql这个文件执行,会创建一个canal_manager数据库和几张表。
修改配置
修改配置文件conf/application.yml
,按如下配置即可,主要是修改数据源配置和canal-admin
的管理账号配置,注意需要用一个有读写权限的数据库账号,比如管理账号root:root
;
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: ip地址:3306
database: canal_manager
username: root
password: root密码
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${
spring.datasource.address}/${
spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
- 接下来对之前搭建的
canal-server(canal_deployer)
的conf/canal_local.properties
文件进行配置,主要是修改canal-admin
的配置,修改完成后使用sh bin/startup.sh local
重启canal-server
:
# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
同样的,修改bin/startup.sh的运行参数
然后运行startup脚本启动admin
访问canal-admin的Web界面,输入账号密码admin:123456
即可登录,访问地址:[http://192.168.80.100:8089
登录成功后即可使用Web界面操作canal-server。
10.遇见的问题
运行内存问题
这个问题还是比较坑的,刚开始我的Canal都是在云服务器上跑的,没有修改运行参数,结果经常就是出现Canal运行起来了,然后ES挂了,或者就是Canal运行不起来,所以内存小的xdm
一定要
修改参数。端口问题
保证Linux要开启11111 11110 8089端口
启动顺序问题
要先启动
canal_deployer
,然后启动canal_adapter
发现bin目录下有一些
hs_error_xxx
文件,要将其删除,然后继续配置后启动即可
边栏推荐
- A glimpse of spir-v
- Superfluid_ HQ hacked analysis
- SSH login is stuck and disconnected
- In the era of industrial Internet, we will achieve enough development by relying on large industrial categories
- What is weak reference? What are the weak reference data types in ES6? What are weak references in JS?
- Mobilenet series (5): use pytorch to build mobilenetv3 and learn and train based on migration
- Leetcode study - day 35
- Exciting, 2022 open atom global open source summit registration is hot
- 【已解决】如何生成漂亮的静态文档说明页
- 程序员搞开源,读什么书最合适?
猜你喜欢
[pat (basic level) practice] - [simple mathematics] 1062 simplest fraction
Four commonly used techniques for anti aliasing
有谁知道 达梦数据库表的列的数据类型 精度怎么修改呀
VMware Tools安装报错:无法自动安装VSock驱动程序
After 95, the CV engineer posted the payroll and made up this. It's really fragrant
IP storage and query in MySQL
ThreeDPoseTracker项目解析
Building core knowledge points
A Cooperative Approach to Particle Swarm Optimization
Recoverable fuse characteristic test
随机推荐
A glimpse of spir-v
MySQL learning notes 2
ThreeDPoseTracker项目解析
The inconsistency between the versions of dynamic library and static library will lead to bugs
Overview of Zhuhai purification laboratory construction details
Spir - V premier aperçu
VMware Tools installation error: unable to automatically install vsock driver
[技术发展-28]:信息通信网大全、新的技术形态、信息通信行业高质量发展概览
[day 30] given an integer n, find the sum of its factors
3D模型格式汇总
Use of crawler manual 02 requests
leetcode刷题_反转字符串中的元音字母
How does Huawei enable debug and how to make an image port
在产业互联网时代,将会凭借大的产业范畴,实现足够多的发展
SSH login is stuck and disconnected
Live broadcast system code, custom soft keyboard style: three kinds of switching: letters, numbers and punctuation
Finding the nearest common ancestor of binary search tree by recursion
有谁知道 达梦数据库表的列的数据类型 精度怎么修改呀
Basic process and testing idea of interface automation
Huawei Hrbrid interface and VLAN division based on IP