当前位置:网站首页>canal同步mysql数据变化到kafka(centos部署)
canal同步mysql数据变化到kafka(centos部署)
2022-07-06 04:23:00 【豆约翰】
第一步:环境准备
- 系统CentOS7.9:查看CentOS版本: cat /etc/redhat-release;其中机器内存至少2个G内存,因为canal启动的默认最大内存是2G。
- jdk版本jdk1.8: 安装步骤请见:[https://blog.csdn.net/u010132847/article/details/117002124?spm=1001.2014.3001.5501][https_blog.csdn.net_u010132847_article_details_117002124_spm_1001.2014.3001.5501]
- 部署mysql8版本的机器ip: 192.168.187.132,mysql8版本安装请见:[https://blog.csdn.net/u010132847/article/details/116905080?spm=1001.2014.3001.5501][https_blog.csdn.net_u010132847_article_details_116905080_spm_1001.2014.3001.5501]
- 安装canal+kafka的机器ip: 192.168.187.131
第二步:mysql先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式。
- mysql8版本默认是开启binlog功能的,这一步实际可以忽略。
第三步:数据库新建一个canal账号并赋予其数据库同步的权限。
- 新建用户:create user ‘canal’@‘%’ identified by ‘Canal123!’;
- 赋予用户canal的SELECT、
REPLICATION SLAVE
和REPLICATION CLIENT
权限:
- “mysql_native_password ”的特殊处理:mysql8默认用sha2这个plugin来进行密码加密,如果用户密码生产时没用这个插件,客户端接入的时候会报“caching_sha2_password Auth failed”的错误,如下错误显示。
2020-05-06 17:19:24.944 [destination = example , address = /192.168.187.132:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /192.168.187.132:3306 failure
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: caching_sha2_password Auth failed
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257)
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80)
... 4 more
]
- 因此需要使用关键字“mysql_native_password ”插件赋值账户密码,如下:
表达式如:alter user ‘canal’@‘%’ identified with mysql_native_password by ‘Canal123!’;
第四步:安装部署kafka(包括zookeeper)
- 下载安装包:wget [https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz][https_mirrors.tuna.tsinghua.edu.cn_apache_kafka_2.8.0_kafka_2.13-2.8.0.tgz]
- 解压安装包:tar -zxvf kafka_2.13-2.8.0.tgz
- 在/opt/kafka_2.13-2.8.0/bin目录可以看见这些启动脚本(稍微注意就发现里面还包含zookeeper脚本,是的,kafka安装包里包含了zookeeper服务部署,不用另外下载部署,直接在这里启动):
- 在/opt/kafka_2.13-2.8.0/config目录可以看见服务的配置:
- 先启动zookeeper(配置文件zookeeper.properties无需改动):
命令:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
备注:这里“-daemon”表示守护进程启动。Daemon进程也就是守护进程,linux大多数的服务进程都是通过守护进程实现的。比如0号进程(调度进程) ,1号进程(init进程)。从其名字守护看出其一般就是机器启动就运行,关机才停止。所以其应该不会受到终端的影响。同时其实在后台运行的。
- 再启动kafka(配置文件config/server.properties需要改动,如下):
zookeeper.connect=192.168.187.131:2181
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092 #本机ip
# ...
之后启动执行命令:bin/kafka-server-start.sh -daemon config/server.properties &
停止kafka服务执行命令:bin/kafka-server-stop.sh
第五步:安装部署canal
- 下载canal: wget [https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz][https_github.com_alibaba_canal_releases_download_canal-1.1.5_canal.deployer-1.1.5.tar.gz]
- 解压canal(canal安装包解压放置新建canal文件解压): tar -zxvf [canal.deployer-1.1.5.tar.gz][https_github.com_alibaba_canal_releases_download_canal-1.1.5_canal.deployer-1.1.5.tar.gz]
- 解压后的目录解释如下:
- bin # 运维脚本文件
- conf # 配置文件目录
canal_local.properties # canal本地配置,一般不需要改动
canal.properties # canal服务配置
logback.xml # logback日志配置
metrics # 度量统计配置
spring # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
example # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
instance.properties # 实例配置,一般指单个数据库的配置
- lib # 服务依赖包
- logs # 日志文件输出目录
- plugin # 支持的插件目录
connector.kafka-1.1.5-jar-with-dependencies.jar #kafka依赖包
connector.rabbitmq-1.1.5-jar-with-dependencies.jar #rabbitmq依赖包
connector.rocketmq-1.1.5-jar-with-dependencies.jar #rocketmq依赖包
- 修改配置文件:
配置文件config/canal.properties找到“canal.serverMode”的值改成“kafka”,其他无需改动。
配置文件config/exmaple/instance.properties:
找到“canal.instance.master.address”,将其值改成安装mysql服务器的ip及端口号。
找到“canal.instance.dbUsername”和“canal.instance.dbPassword”的值改成之前新建的mysql数据库备份账号canal及其密码,其他无需改动。
- 启动canal服务:找到/opt/kafka/bin目录,执行脚本startup.sh。
第六步:测试数据同步功能
- 使用其他有创建数据库、表权限的账号在mysql服务创建一个数据库并创建一张表。
- 我们注意到config/example/instanct.properties有个“example”的topic配置,这是kafka测试使用topic,已经存在无需创建,我们拿这个topic来测试,如下图:
- 在kafka安装目录目录/opt/kafka_2.13-2.8.0,执行kafka的topic的消费者日志,发现有创建对应表的消费日志。
命令:bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic example
- 再插入一条数据看下是否有对应消息日志:
边栏推荐
- Solve the compilation problem of "c2001: line breaks in constants"
- 拉格朗日插值法
- Deep learning framework installation (tensorflow & pytorch & paddlepaddle)
- 2/11 matrix fast power +dp+ bisection
- E. Best Pair
- flink sql 能同时读多个topic吗。with里怎么写
- Overturn your cognition? The nature of get and post requests
- Understanding of processes, threads, coroutines, synchronization, asynchrony, blocking, non blocking, concurrency, parallelism, and serialization
- Lombok principle and the pit of ⽤ @data and @builder at the same time
- HotSpot VM
猜你喜欢
Execution order of scripts bound to game objects
Easyrecovery reliable and toll free data recovery computer software
满足多元需求:捷码打造3大一站式开发套餐,助力高效开发
Fedora/REHL 安装 semanage
CADD课程学习(7)-- 模拟靶点和小分子相互作用 (柔性对接 AutoDock)
Recommendation system (IX) PNN model (product based neural networks)
[tomato assistant installation]
When debugging after pycharm remote server is connected, trying to add breakpoint to file that does not exist: /data appears_ sda/d:/segmentation
Fundamentals of SQL database operation
【leetcode】1189. Maximum number of "balloons"
随机推荐
Tengine kernel parameters
coreldraw2022新版本新功能介绍cdr2022
牛顿插值法
newton interpolation
Implementation of knowledge consolidation source code 1: epoll implementation of TCP server
Stable Huawei micro certification, stable Huawei cloud database service practice
729. My schedule I (set or dynamic open point segment tree)
How to execute an SQL statement in MySQL
How to solve the problem of slow downloading from foreign NPM official servers—— Teach you two ways to switch to Taobao NPM image server
CADD course learning (8) -- virtual screening of Compound Library
Path of class file generated by idea compiling JSP page
PTA tiantisai l1-078 teacher Ji's return (15 points) detailed explanation
Hashlimit rate control
脚本生命周期
Class A, B, C networks and subnet masks in IPv4
How to realize automatic playback of H5 video
One question per day (Mathematics)
1291_ Add timestamp function in xshell log
2/13 qaq~~ greed + binary prefix sum + number theory (find the greatest common factor of multiple numbers)
【leetcode】22. bracket-generating