当前位置:网站首页>canal同步mysql数据变化到kafka(centos部署)
canal同步mysql数据变化到kafka(centos部署)
2022-07-06 04:23:00 【豆约翰】
第一步:环境准备
- 系统CentOS7.9:查看CentOS版本: cat /etc/redhat-release;其中机器内存至少2个G内存,因为canal启动的默认最大内存是2G。
- jdk版本jdk1.8: 安装步骤请见:[https://blog.csdn.net/u010132847/article/details/117002124?spm=1001.2014.3001.5501][https_blog.csdn.net_u010132847_article_details_117002124_spm_1001.2014.3001.5501]
- 部署mysql8版本的机器ip: 192.168.187.132,mysql8版本安装请见:[https://blog.csdn.net/u010132847/article/details/116905080?spm=1001.2014.3001.5501][https_blog.csdn.net_u010132847_article_details_116905080_spm_1001.2014.3001.5501]
- 安装canal+kafka的机器ip: 192.168.187.131
第二步:mysql先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式。
- mysql8版本默认是开启binlog功能的,这一步实际可以忽略。
第三步:数据库新建一个canal账号并赋予其数据库同步的权限。
- 新建用户:create user ‘canal’@‘%’ identified by ‘Canal123!’;
- 赋予用户canal的SELECT、
REPLICATION SLAVE
和REPLICATION CLIENT
权限:
- “mysql_native_password ”的特殊处理:mysql8默认用sha2这个plugin来进行密码加密,如果用户密码生产时没用这个插件,客户端接入的时候会报“caching_sha2_password Auth failed”的错误,如下错误显示。
2020-05-06 17:19:24.944 [destination = example , address = /192.168.187.132:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /192.168.187.132:3306 failure
Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89)
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86)
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: caching_sha2_password Auth failed
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257)
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80)
... 4 more
]
- 因此需要使用关键字“mysql_native_password ”插件赋值账户密码,如下:
表达式如:alter user ‘canal’@‘%’ identified with mysql_native_password by ‘Canal123!’;
第四步:安装部署kafka(包括zookeeper)
- 下载安装包:wget [https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz][https_mirrors.tuna.tsinghua.edu.cn_apache_kafka_2.8.0_kafka_2.13-2.8.0.tgz]
- 解压安装包:tar -zxvf kafka_2.13-2.8.0.tgz
- 在/opt/kafka_2.13-2.8.0/bin目录可以看见这些启动脚本(稍微注意就发现里面还包含zookeeper脚本,是的,kafka安装包里包含了zookeeper服务部署,不用另外下载部署,直接在这里启动):
- 在/opt/kafka_2.13-2.8.0/config目录可以看见服务的配置:
- 先启动zookeeper(配置文件zookeeper.properties无需改动):
命令:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
备注:这里“-daemon”表示守护进程启动。Daemon进程也就是守护进程,linux大多数的服务进程都是通过守护进程实现的。比如0号进程(调度进程) ,1号进程(init进程)。从其名字守护看出其一般就是机器启动就运行,关机才停止。所以其应该不会受到终端的影响。同时其实在后台运行的。
- 再启动kafka(配置文件config/server.properties需要改动,如下):
zookeeper.connect=192.168.187.131:2181
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092 #本机ip
# ...
之后启动执行命令:bin/kafka-server-start.sh -daemon config/server.properties &
停止kafka服务执行命令:bin/kafka-server-stop.sh
第五步:安装部署canal
- 下载canal: wget [https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz][https_github.com_alibaba_canal_releases_download_canal-1.1.5_canal.deployer-1.1.5.tar.gz]
- 解压canal(canal安装包解压放置新建canal文件解压): tar -zxvf [canal.deployer-1.1.5.tar.gz][https_github.com_alibaba_canal_releases_download_canal-1.1.5_canal.deployer-1.1.5.tar.gz]
- 解压后的目录解释如下:
- bin # 运维脚本文件
- conf # 配置文件目录
canal_local.properties # canal本地配置,一般不需要改动
canal.properties # canal服务配置
logback.xml # logback日志配置
metrics # 度量统计配置
spring # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
example # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
instance.properties # 实例配置,一般指单个数据库的配置
- lib # 服务依赖包
- logs # 日志文件输出目录
- plugin # 支持的插件目录
connector.kafka-1.1.5-jar-with-dependencies.jar #kafka依赖包
connector.rabbitmq-1.1.5-jar-with-dependencies.jar #rabbitmq依赖包
connector.rocketmq-1.1.5-jar-with-dependencies.jar #rocketmq依赖包
- 修改配置文件:
配置文件config/canal.properties找到“canal.serverMode”的值改成“kafka”,其他无需改动。
配置文件config/exmaple/instance.properties:
找到“canal.instance.master.address”,将其值改成安装mysql服务器的ip及端口号。
找到“canal.instance.dbUsername”和“canal.instance.dbPassword”的值改成之前新建的mysql数据库备份账号canal及其密码,其他无需改动。
- 启动canal服务:找到/opt/kafka/bin目录,执行脚本startup.sh。
第六步:测试数据同步功能
- 使用其他有创建数据库、表权限的账号在mysql服务创建一个数据库并创建一张表。
- 我们注意到config/example/instanct.properties有个“example”的topic配置,这是kafka测试使用topic,已经存在无需创建,我们拿这个topic来测试,如下图:
- 在kafka安装目录目录/opt/kafka_2.13-2.8.0,执行kafka的topic的消费者日志,发现有创建对应表的消费日志。
命令:bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic example
- 再插入一条数据看下是否有对应消息日志:
边栏推荐
- [Zhao Yuqiang] deploy kubernetes cluster with binary package
- 查询mysql数据库中各表记录数大小
- Web components series (VII) -- life cycle of custom components
- 2328. 网格图中递增路径的数目(记忆化搜索)
- JVM garbage collector concept
- [face recognition series] | realize automatic makeup
- 2/11 matrix fast power +dp+ bisection
- Certbot failed to update certificate solution
- Knowledge consolidation source code implementation 3: buffer ringbuffer
- 满足多元需求:捷码打造3大一站式开发套餐,助力高效开发
猜你喜欢
How does computer nail adjust sound
Slow SQL fetching and analysis of MySQL database
Execution order of scripts bound to game objects
Data processing methods - smote series and adasyn
Understanding of processes, threads, coroutines, synchronization, asynchrony, blocking, non blocking, concurrency, parallelism, and serialization
Jd.com 2: how to prevent oversold in the deduction process of commodity inventory?
食品行业仓储条码管理系统解决方案
Coreldraw2022 new version new function introduction cdr2022
Path of class file generated by idea compiling JSP page
Fundamentals of SQL database operation
随机推荐
Mlapi series - 04 - network variables and network serialization [network synchronization]
10个 Istio 流量管理 最常用的例子,你知道几个?
Brief tutorial for soft exam system architecture designer | general catalog
Can Flink SQL read multiple topics at the same time. How to write in with
捷码赋能案例:专业培训、技术支撑,多措并举推动毕业生搭建智慧校园毕设系统
After learning classes and objects, I wrote a date class
Deep learning framework installation (tensorflow & pytorch & paddlepaddle)
CertBot 更新证书失败解决
The global and Chinese market of negative pressure wound therapy unit (npwtu) 2022-2028: Research Report on technology, participants, trends, market size and share
2/13 qaq~~ greed + binary prefix sum + number theory (find the greatest common factor of multiple numbers)
Case of Jiecode empowerment: professional training, technical support, and multiple measures to promote graduates to build smart campus completion system
Data processing methods - smote series and adasyn
【leetcode】22. bracket-generating
One question per day (Mathematics)
PTA tiantisai l1-078 teacher Ji's return (15 points) detailed explanation
Understanding of processes, threads, coroutines, synchronization, asynchrony, blocking, non blocking, concurrency, parallelism, and serialization
题解:《单词覆盖还原》、《最长连号》、《小玉买文具》、《小玉家的电费》
asp. Core is compatible with both JWT authentication and cookies authentication
CADD课程学习(7)-- 模拟靶点和小分子相互作用 (柔性对接 AutoDock)
2328. 网格图中递增路径的数目(记忆化搜索)