当前位置:网站首页>canal同步mysql数据变化到kafka(centos部署)
canal同步mysql数据变化到kafka(centos部署)
2022-07-06 04:23:00 【豆约翰】
第一步:环境准备
- 系统CentOS7.9:查看CentOS版本: cat /etc/redhat-release;其中机器内存至少2个G内存,因为canal启动的默认最大内存是2G。
- jdk版本jdk1.8: 安装步骤请见:[https://blog.csdn.net/u010132847/article/details/117002124?spm=1001.2014.3001.5501][https_blog.csdn.net_u010132847_article_details_117002124_spm_1001.2014.3001.5501]
- 部署mysql8版本的机器ip: 192.168.187.132,mysql8版本安装请见:[https://blog.csdn.net/u010132847/article/details/116905080?spm=1001.2014.3001.5501][https_blog.csdn.net_u010132847_article_details_116905080_spm_1001.2014.3001.5501]
- 安装canal+kafka的机器ip: 192.168.187.131
第二步:mysql先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式。
- mysql8版本默认是开启binlog功能的,这一步实际可以忽略。
第三步:数据库新建一个canal账号并赋予其数据库同步的权限。
- 新建用户:create user ‘canal’@‘%’ identified by ‘Canal123!’;
- 赋予用户canal的SELECT、
REPLICATION SLAVE
和REPLICATION CLIENT
权限:
- “mysql_native_password ”的特殊处理:mysql8默认用sha2这个plugin来进行密码加密,如果用户密码生产时没用这个插件,客户端接入的时候会报“caching_sha2_password Auth failed”的错误,如下错误显示。
2020-05-06 17:19:24.944 [destination = example , address = /192.168.187.132:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /192.168.187.132:3306 failure
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: caching_sha2_password Auth failed
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257)
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80)
... 4 more
]
- 因此需要使用关键字“mysql_native_password ”插件赋值账户密码,如下:
表达式如:alter user ‘canal’@‘%’ identified with mysql_native_password by ‘Canal123!’;
第四步:安装部署kafka(包括zookeeper)
- 下载安装包:wget [https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz][https_mirrors.tuna.tsinghua.edu.cn_apache_kafka_2.8.0_kafka_2.13-2.8.0.tgz]
- 解压安装包:tar -zxvf kafka_2.13-2.8.0.tgz
- 在/opt/kafka_2.13-2.8.0/bin目录可以看见这些启动脚本(稍微注意就发现里面还包含zookeeper脚本,是的,kafka安装包里包含了zookeeper服务部署,不用另外下载部署,直接在这里启动):
- 在/opt/kafka_2.13-2.8.0/config目录可以看见服务的配置:
- 先启动zookeeper(配置文件zookeeper.properties无需改动):
命令:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
备注:这里“-daemon”表示守护进程启动。Daemon进程也就是守护进程,linux大多数的服务进程都是通过守护进程实现的。比如0号进程(调度进程) ,1号进程(init进程)。从其名字守护看出其一般就是机器启动就运行,关机才停止。所以其应该不会受到终端的影响。同时其实在后台运行的。
- 再启动kafka(配置文件config/server.properties需要改动,如下):
zookeeper.connect=192.168.187.131:2181
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092 #本机ip
# ...
之后启动执行命令:bin/kafka-server-start.sh -daemon config/server.properties &
停止kafka服务执行命令:bin/kafka-server-stop.sh
第五步:安装部署canal
- 下载canal: wget [https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz][https_github.com_alibaba_canal_releases_download_canal-1.1.5_canal.deployer-1.1.5.tar.gz]
- 解压canal(canal安装包解压放置新建canal文件解压): tar -zxvf [canal.deployer-1.1.5.tar.gz][https_github.com_alibaba_canal_releases_download_canal-1.1.5_canal.deployer-1.1.5.tar.gz]
- 解压后的目录解释如下:
- bin # 运维脚本文件
- conf # 配置文件目录
canal_local.properties # canal本地配置,一般不需要改动
canal.properties # canal服务配置
logback.xml # logback日志配置
metrics # 度量统计配置
spring # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
example # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
instance.properties # 实例配置,一般指单个数据库的配置
- lib # 服务依赖包
- logs # 日志文件输出目录
- plugin # 支持的插件目录
connector.kafka-1.1.5-jar-with-dependencies.jar #kafka依赖包
connector.rabbitmq-1.1.5-jar-with-dependencies.jar #rabbitmq依赖包
connector.rocketmq-1.1.5-jar-with-dependencies.jar #rocketmq依赖包
- 修改配置文件:
配置文件config/canal.properties找到“canal.serverMode”的值改成“kafka”,其他无需改动。
配置文件config/exmaple/instance.properties:
找到“canal.instance.master.address”,将其值改成安装mysql服务器的ip及端口号。
找到“canal.instance.dbUsername”和“canal.instance.dbPassword”的值改成之前新建的mysql数据库备份账号canal及其密码,其他无需改动。
- 启动canal服务:找到/opt/kafka/bin目录,执行脚本startup.sh。
第六步:测试数据同步功能
- 使用其他有创建数据库、表权限的账号在mysql服务创建一个数据库并创建一张表。
- 我们注意到config/example/instanct.properties有个“example”的topic配置,这是kafka测试使用topic,已经存在无需创建,我们拿这个topic来测试,如下图:
- 在kafka安装目录目录/opt/kafka_2.13-2.8.0,执行kafka的topic的消费者日志,发现有创建对应表的消费日志。
命令:bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic example
- 再插入一条数据看下是否有对应消息日志:
边栏推荐
- Coreldraw2022 new version new function introduction cdr2022
- Patent | subject classification method based on graph convolution neural network fusion of multiple human brain maps
- Recommendation | recommendation of 9 psychotherapy books
- BOM - location, history, pop-up box, timing
- 2/10 parallel search set +bfs+dfs+ shortest path +spfa queue optimization
- 脚本生命周期
- MySQL learning record 13 database connection pool, pooling technology, DBCP, c3p0
- Stack and queue
- Meet diverse needs: jetmade creates three one-stop development packages to help efficient development
- The most detailed and comprehensive update content and all functions of guitar pro 8.0
猜你喜欢
关于进程、线程、协程、同步、异步、阻塞、非阻塞、并发、并行、串行的理解
Recommendation | recommendation of 9 psychotherapy books
Case of Jiecode empowerment: professional training, technical support, and multiple measures to promote graduates to build smart campus completion system
How many of the 10 most common examples of istio traffic management do you know?
About some basic DP -- those things about coins (the basic introduction of DP)
颠覆你的认知?get和post请求的本质
MySQL learning record 13 database connection pool, pooling technology, DBCP, c3p0
Solution to the problem that the root account of MySQL database cannot be logged in remotely
Patent | subject classification method based on graph convolution neural network fusion of multiple human brain maps
10 exemples les plus courants de gestion du trafic istio, que savez - vous?
随机推荐
Overturn your cognition? The nature of get and post requests
拉格朗日插值法
Le compte racine de la base de données MySQL ne peut pas se connecter à distance à la solution
Implementation of knowledge consolidation source code 2: TCP server receives and processes half packets and sticky packets
【HBZ分享】云数据库如何定位慢查询
Global and Chinese markets for endoscopic drying storage cabinets 2022-2028: Research Report on technology, participants, trends, market size and share
食品行业仓储条码管理系统解决方案
Easyrecovery reliable and toll free data recovery computer software
Record an excel xxE vulnerability
Deep learning framework installation (tensorflow & pytorch & paddlepaddle)
Stable Huawei micro certification, stable Huawei cloud database service practice
Meet diverse needs: jetmade creates three one-stop development packages to help efficient development
软考 系统架构设计师 简明教程 | 总目录
Global and Chinese markets for otolaryngology devices 2022-2028: Research Report on technology, participants, trends, market size and share
Data processing methods - smote series and adasyn
Mlapi series - 04 - network variables and network serialization [network synchronization]
P3500 [POI2010]TES-Intelligence Test(二分&离线)
How can programmers resist the "three poisons" of "greed, anger and ignorance"?
. Net interprocess communication
Unity中几个重要类