当前位置:网站首页>使用canal配合rocketmq监听mysql的binlog日志
使用canal配合rocketmq监听mysql的binlog日志
2022-07-04 17:38:00 【canxiusi】
一. 安装配置canal
1.1 安装canal
canal由阿里开源到了github, 可在此页面 github地址, 下载开发版本的压缩包, 由于github下载巨慢, 也可以自己寻找国内资源

之后解压缩, canal 的文件目录如下

1.2 配置canal基本属性
进入到 conf 文件夹, 目录结构如下

先进行基础的配置, vim命令进入到 canal.properties 文件, 配置 canal 服务的ip地址, 以及端口号

之后配置 canal 可视化控制台, 我这里没安装, 所以暂时不配置

由于本次是配合 rocketMQ 使用, 所以在下面的配置中找到 canal.serverMode, 并设置为 rocketMQ

之后, 进入到关键的配置, 配置 rocketMQ 各项属性
rocketmq.producer.group : 生产者分组, 和在代码中指定的 group 的含义一致
rocketmq.namesrv.addr : mq 的 nameSvr 的ip地址
rocketmq.tag : 生产者生产消息时, 指定的 tag

1.3 配置canal的mysql
进入 conf 下的 example 文件夹, 目录如下, vim 命令进入 instance.properties 
几个比较重要的配置
canal.instance.mysql.slaveId : 因为 canal 的工作原理是伪装成 mysql 的 slave, 所以这里需要设置 slave 的 id
canal.instance.master.address : 数据库的 ip 地址以及端口号

canal.instance.dbUsername : 要监听的数据库的登录用户名
canal.instance.dbPassword : 数据库的登录密码

canal.instance.filter.regex : 要监听的数据库表, 正则表达式匹配模式, 没配置就是全部监听
canal.mq.topic : mq 生产者的发送消息时, 指定的 topic, 和 java代码的含义一致

二. mysql配置
2.1 开启mysql的binlog日志
由于 canal 的工作原理, 所以需要开启mysql的 binlog 日志, vim 命令编辑 etc下的 my.cnf 文件, 在 mysqld 下添加如下配置 :
log-bin : 配置 binlog 日志文件目录
binlog-format : 配置 binlog 日志文件的格式
server_id : 配置 mysql 主节点的 id, 不能和集群中id, 或是从节点中的 id 重复, 该 id 不能和 canal 的 slaveId 重复

之后查看 mysql binlog 的开启状态, show variables like '%log_bin%', 可以看到是开启状态
2.2 配置 canal 专用用户
在 canal 配置文件中, 我使用的 root 用户登录的, 所以不在给 mysql 配置 canal 用户, 可参考其他博客
2.3 启动canal
进入到bin文件夹, 目录如下, 使用命令 ./startup.sh 启 canal, 特别注意, 如果服务器内存不充足, 可以先使用 vim 编辑 startup.sh, 把文件中的 java参数的使用内存设置小一点

之后观察 canal.log 日志, 可以看到启动成功

查看 rocketmq_client.log 日志, 发现 canal 一直在像 mq 发送心跳检测, 并输出了 mq 的 group, 实例id信息

登录 rocketMQ 可视化控制台, 可以看到 canal 注册到 mq 中的生产者实例, 以及 topic 信息, 这些信息和我们之前在 canal 的配置文件中配置的一致


至此, 关于服务器中 canal 的配置介绍完毕, 下面用java代码实现 mq 消费者监生产者的信息
三. java代码的实现
3.1 mq消费者
由于 canal 使用的 mq 模式, 他可以说是一个 mq 生产者, 所以我们需要定义 mq 的消费者, 代码如下, 消费者要监听的 topic 和 tag 和 canal 配置文件中配置的一致
消费者抽象类
/**
* @author canxiusi.yan
* @description AbstractProducer 抽象SysLog抽象父类
* @date 2022/5/10 10:03
*/
public abstract class AbstractBinLogConsumer {
/**
* 子类通用日志
*/
protected static final Logger logger = LoggerFactory.getLogger(AbstractBinLogConsumer.class);
private DefaultMQPushConsumer consumer;
private final static String GROUP_NAME = "canal-rocketmq-es%canal";
/**
* 初始化mq消费者
*/
@PostConstruct
public void initConsumer() {
// 相同的业务消费者分到同组, 需要指定相同的tag,
consumer = new DefaultMQPushConsumer(GROUP_NAME);
consumer.setNamesrvAddr("120.26.196.135:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setInstanceName("bin_log_consumer");
consumer.setPullInterval(1000);
// 设置每次从队列中拉取的消息数
consumer.setPullBatchSize(1000);
try {
consumer.subscribe("binlog_topic", "binlog_es_tag");
} catch (MQClientException e) {
logger.error("[binlog消费者初始化异常]", e);
System.exit(-1);
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
logger.info(LogUtils.format("[消息列表为空], msgs=<{0}>", msgs));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
handlerMsg(msg);
} catch (Exception e) {
logger.error("[日志消费处理异常]", e);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
logger.info(LogUtils.format("[binlog消费者初始化完成], consumer=<{0}>", consumer));
}
/**
* 启动消费者
*
* @param event
*/
@EventListener(ApplicationPreparedEvent.class)
public void startSysLogConsumer(ApplicationPreparedEvent event) {
try {
consumer.start();
} catch (MQClientException e) {
logger.error(LogUtils.format("[binlog消费者start异常], nameServerAddress={}"), e);
System.exit(-1);
}
}
@PreDestroy
public void closeConsumer() {
consumer.shutdown();
}
/**
* 解析binlog日志
* @param msg
*/
protected abstract void handlerMsg(MessageExt msg);
}消费者实现类, 这里只把监听的消息, 做下日志打印
/**
* @author canxiusi.yan
* @description BinlogConsumer
* @date 2022/6/28 15:07
*/
@Component
public class BinlogConsumer extends AbstractBinLogConsumer {
/**
* 日志
*/
private static final Logger logger = LoggerFactory.getLogger(BinlogConsumer.class);
@Override
protected void handlerMsg(MessageExt msg) {
JSONObject jsonObject = JSONObject.parseObject(new String(msg.getBody()));
// 影响到几条sql, 就会有几条日志
logger.info(LogUtils.format("监听到binlog日志, json=<{0}>", jsonObject));
}
}3.2 运行模拟
运行程序, 登录 rocketmq 控制台, 发现消费者已经注册成功

我这里使用 postman 调用新增数据的接口, 像 db 插入一条sql, 流程: 向db插入数据, 生成 binlog 日志, 把日志同步给伪装成了从节点的 canal, 然后把 binlog 日志发送给 mq, 之后代码中的消费者监听到数据, 观察日志打印, 可以发现, 已经监听到了消息

格式化消息后, 可以看到, 本次 db 变更的类型, 变更的数据, 都是能获取到的, 后面就可以做很多事情了, 比如把数据同步到 redis, es搜索引擎等

边栏推荐
- [cloud voice suggestion collection] cloud store renewal and upgrading: provide effective suggestions, win a large number of code beans, Huawei AI speaker 2!
- 6.26CF模拟赛B:数组缩减题解
- repeat_P1002 [NOIP2002 普及组] 过河卒_dp
- 力扣刷题日记/day7/2022.6.29
- 激进技术派 vs 项目保守派的微服务架构之争
- [go ~ 0 to 1] read, write and create files on the sixth day
- Principle and application of ThreadLocal
- 【2022年江西省研究生数学建模】冰壶运动 思路分析及代码实现
- 资料下载 丨首届腾讯技术开放日课程精华!
- 力扣刷题日记/day8/7.1
猜你喜欢

Wireshark packet capturing TLS protocol bar displays version inconsistency

Wireshark抓包TLS协议栏显示版本不一致问题

读写关闭的channel是啥后果?

Basic tutorial of scala -- 16 -- generics

What types of Thawte wildcard SSL certificates provide

VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题

Lex and yacc based lexical analyzer + parser

从实时应用角度谈通信总线仲裁机制和网络流控

Process of manually encrypt the mass-producing firmware and programming ESP devices

Halcon模板匹配
随机推荐
Scala basic tutorial -- 18 -- set (2)
神经网络物联网平台搭建(物联网平台搭建实战教程)
[2022 Jiangxi graduate mathematical modeling] curling movement idea analysis and code implementation
Journal des problèmes de brosse à boutons de force / day6 / 6.28
Download the first Tencent technology open day course essence!
小发猫物联网平台搭建与应用模型
一种将Tree-LSTM的强化学习用于连接顺序选择的方法
力扣刷题日记/day3/2022.6.25
其他InterSystems %Net工具
Scala基础教程--12--读写数据
Scala basic tutorial -- 17 -- Collection
Scala基础教程--14--隐式转换
Scala basic tutorial -- 13 -- advanced function
[mathematical modeling of graduate students in Jiangxi Province in 2022] analysis and code implementation of haze removal by nucleation of water vapor supersaturation
Li Kou brush question diary /day1/2022.6.23
ByteDance dev better technology salon was successfully held, and we joined hands with Huatai to share our experience in improving the efficiency of web research and development
力扣刷題日記/day6/6.28
Using SSH
Li Kou brush question diary /day7/6.30
Perfect JS event delegation