当前位置:网站首页>Alibaba-Canal使用详解(排坑版)_MySQL与ES数据同步
Alibaba-Canal使用详解(排坑版)_MySQL与ES数据同步
2022-07-06 01:21:00 【shstart7】
canal概述
用处
canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。
工作原理
canal会模拟MySQL主库和从库的交互协议,从而伪装成MySQL的从库,然后向MySQL主库发送dump协议,MySQL主库收到dump请求会向canal推送binlog,canal通过解析binlog将数据同步到其他存储中去。
1.版本
这里我的MySQL和ES都安装在阿里云服务器上
,MySQL版本为5.7,ES版本为7.14.
2.下载
下载地址Github,这里我下载的版本为v1.1.5-alpha-2
,下载前三个组件即可。
3.上传
由于Canal启动所需要的内存较大,我的阿里云服务器配置较低,所以这里我在本地的Liunx服务器上安装配置Canal。
将三个组件上传到Linux服务器上。
4.配置MySQL
由于canal是通过订阅MySQL的binlog来实现数据同步的,所以我们需要开启MySQL的binlog写入功能,并设置binlog-format
为ROW模式,
我的MySQL是跑在阿里云Docker中的,并且做了挂载,所以修改外部的my.cnf文件即可,加入下面的配置即可
[mysqld]
## 设置server_id,同一局域网中需要唯一
server_id=101
## 指定不需要同步的数据库名称
binlog-ignore-db=mysql
## 开启二进制日志功能
log-bin=mall-mysql-bin
## 设置二进制日志使用内存大小(事务)
binlog_cache_size=1M
## 设置使用的二进制日志格式(mixed,statement,row)
binlog_format=row
## 二进制日志过期清理时间。默认值为0,表示不自动清理。
expire_logs_days=7
## 跳过主从复制中遇到的所有错误或指定类型的错误,避免slave端复制中断。
## 如:1062错误是指一些主键重复,1032错误是因为主从数据库数据不一致
slave_skip_errors=1062
配置完成后需要重启MySQL,重启后使用如下命令查看binlog是否启用
命令
# 查看binlog是否启用
show variables like '%log_bin%';
# 查看下MySQL的binlog模式;
show variables like 'binlog_format%';
接下来需要创建一个拥有从库权限的账号,用于订阅binlog,这里创建的账号为
canal:canal
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
创建好测试使用的数据库bookstore,之后创建一张表book
,
create table book(
id int(32) auto_increment primary key,
book_name varchar(32),
price double,
introduce varchar(2048),
press varchar(32),
author varchar(32),
publish_date DATE
)
5.配置canal-server
将我们上传的canal.deployer-1.1.5-SNAPSHOT.tar.gz
解压到指定目录
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /指定路径
解压后的目录
├── bin
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── canal_local.properties
│ ├── canal.properties
│ └── example
│ └── instance.properties
├── lib
├── logs
│ ├── canal
│ │ └── canal.log
│ └── example
│ ├── example.log
│ └── example.log
└── plugin
修改配置
修改配置文件conf/example/instance.properties
,按如下配置即可,主要是修改数据库相关配置;
# 需要同步数据的MySQL地址
canal.instance.master.address=ip地址:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=canal
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*
修改运行参数
由于canal默认的运行需要的内存较大,这里我们修改一下(这一步非常重要)
修改bin目录下的startup.sh,这里我都改成了512M
启动
- 进入bin目录,启动startup.sh
sh bin/startup.sh
6.配置canal-adapter
跟上面一样,先进行解压安装。解压后的目录如下.
├── bin
│ ├── adapter.pid
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
│ ├── es6
│ ├── es7
│ │ ├── biz_order.yml
│ │ ├── customer.yml
│ │ └── product.yml
│ ├── hbase
│ ├── kudu
│ ├── logback.xml
│ ├── META-INF
│ │ └── spring.factories
│ └── rdb
├── lib
├── logs
│ └── adapter
│ └── adapter.log
└── plugin
修改配置文件
conf/application.yml
,按如下配置即可,主要是修改canal-server配置、数据源配置和客户端适配器配置;
注意: Linux要开启11111端口
canal.conf:
mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
zookeeperHosts: # 对应集群模式下的zk地址
syncBatchSize: 1000 # 每次同步的批数量
retries: 0 # 重试次数, -1为无限重试
timeout: # 同步超时时间, 单位毫秒
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources: # 源数据库配置
defaultDS:
url: jdbc:mysql://自己MySQL的ip地址:3306/bookstore?useUnicode=true
username: canal
password: canal
canalAdapters: # 适配器列表
- instance: example # canal实例名或者MQ topic名
groups: # 分组列表
- groupId: g1 # 分组id, 如果是MQ模式将用到该值
outerAdapters:
- name: logger # 日志打印适配器
- name: es7 # ES同步适配器
hosts: 自己ES的ip地址:9200 # ES连接地址
properties:
mode: rest # 模式可选transport(9300) 或者 rest(9200)
# security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch # ES集群名称
添加映射
进入canal-adapter/conf/es7目录,创建一个book.yml
,配置MySQL中的表与Elasticsearch中索引的映射关系
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: book # es 的索引名称
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
sql: "SELECT
b.id AS _id, #这里是为了将MySQL的id对应上ES的_id
b.id, #这里是为了将MySQL的id对应ES文档中的id属性
b.book_name,
b.price,
b.introduce,
b.press,
b.author,
b.publish_date
FROM
book b" # sql映射
etlCondition: "where a.c_time>={}" #etl的条件参数
commitBatch: 3000 # 提交批大小
修改运行参数
到canal_adapter/bin
目录下,修改startup.sh文件,我这里跟上面一样都设置成了512M
然后运行startup.sh,进行启动
7.在ES中创建索引
PUT /book
{
"mappings" : {
"properties" : {
"author" : {
"type" : "keyword"
},
"book_name" : {
"type" : "text",
"analyzer": "ik_max_word" //使用分词器
},
"id" : {
"type" : "integer"
},
"introduce" : {
"type" : "text",
"analyzer": "ik_max_word" //使用分词器
},
"press" : {
"type" : "keyword"
},
"price" : {
"type" : "double"
},
"publish_date" : {
"type" : "date"
}
}
}
}
查看索引
8.测试
使用SQL语句在数据库中创建一条记录
insert into book values(1, "《Java》", 2323.2,"Java从入门到放弃", "机械工业出版社", "James","2012-12-11")
插入成功后,我们在ES中搜索一下,发现数据已经同步.
测试一下修改
update book set book_name = "《Python》" where id = 1
再次搜索,发现book_name已经同步.
测试一下删除
delete from book where id = 1
同步成功
9.canal-admin使用
解压canal.admin-1.1.5-SNAPSHOT.tar.gz
,解压后的目录如下
├── bin
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
│ ├── canal_manager.sql
│ ├── canal-template.properties
│ ├── instance-template.properties
│ ├── logback.xml
│ └── public
│ ├── avatar.gif
│ ├── index.html
│ ├── logo.png
│ └── static
├── lib
└── logs
执行文件
将conf/canal_manager.sql这个文件执行,会创建一个canal_manager数据库和几张表。
修改配置
修改配置文件conf/application.yml
,按如下配置即可,主要是修改数据源配置和canal-admin
的管理账号配置,注意需要用一个有读写权限的数据库账号,比如管理账号root:root
;
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: ip地址:3306
database: canal_manager
username: root
password: root密码
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${
spring.datasource.address}/${
spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
- 接下来对之前搭建的
canal-server(canal_deployer)
的conf/canal_local.properties
文件进行配置,主要是修改canal-admin
的配置,修改完成后使用sh bin/startup.sh local
重启canal-server
:
# register ip
canal.register.ip =
# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
同样的,修改bin/startup.sh的运行参数
然后运行startup脚本启动admin
访问canal-admin的Web界面,输入账号密码admin:123456
即可登录,访问地址:[http://192.168.80.100:8089
登录成功后即可使用Web界面操作canal-server。
10.遇见的问题
运行内存问题
这个问题还是比较坑的,刚开始我的Canal都是在云服务器上跑的,没有修改运行参数,结果经常就是出现Canal运行起来了,然后ES挂了,或者就是Canal运行不起来,所以内存小的xdm
一定要
修改参数。端口问题
保证Linux要开启11111 11110 8089端口
启动顺序问题
要先启动
canal_deployer
,然后启动canal_adapter
发现bin目录下有一些
hs_error_xxx
文件,要将其删除,然后继续配置后启动即可
边栏推荐
- Use of crawler manual 02 requests
- [Arduino syntax - structure]
- c#网页打开winform exe
- Hcip---ipv6 experiment
- IP storage and query in MySQL
- leetcode刷题_反转字符串中的元音字母
- Daily practice - February 13, 2022
- File upload vulnerability test based on DVWA
- Yii console method call, Yii console scheduled task
- Netease smart enterprises enter the market against the trend, and there is a new possibility for game industrialization
猜你喜欢
Kotlin basics 1
Leetcode study - day 35
Finding the nearest common ancestor of binary search tree by recursion
Cf:h. maximum and [bit operation practice + K operations + maximum and]
Installation and use of esxi
037 PHP login, registration, message, personal Center Design
JVM_ 15_ Concepts related to garbage collection
[机缘参悟-39]:鬼谷子-第五飞箝篇 - 警示之二:赞美的六种类型,谨防享受赞美快感如同鱼儿享受诱饵。
Electrical data | IEEE118 (including wind and solar energy)
有谁知道 达梦数据库表的列的数据类型 精度怎么修改呀
随机推荐
Superfluid_ HQ hacked analysis
Zhuhai's waste gas treatment scheme was exposed
Daily practice - February 13, 2022
Unity VR solves the problem that the handle ray keeps flashing after touching the button of the UI
Ubantu check cudnn and CUDA versions
internship:项目代码所涉及陌生注解及其作用
The growth path of test / development programmers, the problem of thinking about the overall situation
Yii console method call, Yii console scheduled task
Kotlin basics 1
2020.2.13
False breakthroughs in the trend of London Silver
MYSQL---查询成绩为前5名的学生
[day 30] given an integer n, find the sum of its factors
IP storage and query in MySQL
Leetcode1961. Check whether the string is an array prefix
Unity | two ways to realize facial drive
Five challenges of ads-npu chip architecture design
In the era of industrial Internet, we will achieve enough development by relying on large industrial categories
3D model format summary
Unity VR resource flash surface in scene