当前位置:网站首页>使用canal配合rocketmq监听mysql的binlog日志
使用canal配合rocketmq监听mysql的binlog日志
2022-07-04 17:38:00 【canxiusi】
一. 安装配置canal
1.1 安装canal
canal由阿里开源到了github, 可在此页面 github地址, 下载开发版本的压缩包, 由于github下载巨慢, 也可以自己寻找国内资源
之后解压缩, canal 的文件目录如下
1.2 配置canal基本属性
进入到 conf 文件夹, 目录结构如下
先进行基础的配置, vim命令进入到 canal.properties 文件, 配置 canal 服务的ip地址, 以及端口号
之后配置 canal 可视化控制台, 我这里没安装, 所以暂时不配置
由于本次是配合 rocketMQ 使用, 所以在下面的配置中找到 canal.serverMode, 并设置为 rocketMQ
之后, 进入到关键的配置, 配置 rocketMQ 各项属性
rocketmq.producer.group : 生产者分组, 和在代码中指定的 group 的含义一致
rocketmq.namesrv.addr : mq 的 nameSvr 的ip地址
rocketmq.tag : 生产者生产消息时, 指定的 tag
1.3 配置canal的mysql
进入 conf 下的 example 文件夹, 目录如下, vim 命令进入 instance.properties
几个比较重要的配置
canal.instance.mysql.slaveId : 因为 canal 的工作原理是伪装成 mysql 的 slave, 所以这里需要设置 slave 的 id
canal.instance.master.address : 数据库的 ip 地址以及端口号
canal.instance.dbUsername : 要监听的数据库的登录用户名
canal.instance.dbPassword : 数据库的登录密码
canal.instance.filter.regex : 要监听的数据库表, 正则表达式匹配模式, 没配置就是全部监听
canal.mq.topic : mq 生产者的发送消息时, 指定的 topic, 和 java代码的含义一致
二. mysql配置
2.1 开启mysql的binlog日志
由于 canal 的工作原理, 所以需要开启mysql的 binlog 日志, vim 命令编辑 etc下的 my.cnf 文件, 在 mysqld 下添加如下配置 :
log-bin : 配置 binlog 日志文件目录
binlog-format : 配置 binlog 日志文件的格式
server_id : 配置 mysql 主节点的 id, 不能和集群中id, 或是从节点中的 id 重复, 该 id 不能和 canal 的 slaveId 重复
之后查看 mysql binlog 的开启状态, show variables like '%log_bin%', 可以看到是开启状态
2.2 配置 canal 专用用户
在 canal 配置文件中, 我使用的 root 用户登录的, 所以不在给 mysql 配置 canal 用户, 可参考其他博客
2.3 启动canal
进入到bin文件夹, 目录如下, 使用命令 ./startup.sh 启 canal, 特别注意, 如果服务器内存不充足, 可以先使用 vim 编辑 startup.sh, 把文件中的 java参数的使用内存设置小一点
之后观察 canal.log 日志, 可以看到启动成功
查看 rocketmq_client.log 日志, 发现 canal 一直在像 mq 发送心跳检测, 并输出了 mq 的 group, 实例id信息
登录 rocketMQ 可视化控制台, 可以看到 canal 注册到 mq 中的生产者实例, 以及 topic 信息, 这些信息和我们之前在 canal 的配置文件中配置的一致
至此, 关于服务器中 canal 的配置介绍完毕, 下面用java代码实现 mq 消费者监生产者的信息
三. java代码的实现
3.1 mq消费者
由于 canal 使用的 mq 模式, 他可以说是一个 mq 生产者, 所以我们需要定义 mq 的消费者, 代码如下, 消费者要监听的 topic 和 tag 和 canal 配置文件中配置的一致
消费者抽象类
/**
* @author canxiusi.yan
* @description AbstractProducer 抽象SysLog抽象父类
* @date 2022/5/10 10:03
*/
public abstract class AbstractBinLogConsumer {
/**
* 子类通用日志
*/
protected static final Logger logger = LoggerFactory.getLogger(AbstractBinLogConsumer.class);
private DefaultMQPushConsumer consumer;
private final static String GROUP_NAME = "canal-rocketmq-es%canal";
/**
* 初始化mq消费者
*/
@PostConstruct
public void initConsumer() {
// 相同的业务消费者分到同组, 需要指定相同的tag,
consumer = new DefaultMQPushConsumer(GROUP_NAME);
consumer.setNamesrvAddr("120.26.196.135:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setInstanceName("bin_log_consumer");
consumer.setPullInterval(1000);
// 设置每次从队列中拉取的消息数
consumer.setPullBatchSize(1000);
try {
consumer.subscribe("binlog_topic", "binlog_es_tag");
} catch (MQClientException e) {
logger.error("[binlog消费者初始化异常]", e);
System.exit(-1);
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
logger.info(LogUtils.format("[消息列表为空], msgs=<{0}>", msgs));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
handlerMsg(msg);
} catch (Exception e) {
logger.error("[日志消费处理异常]", e);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
logger.info(LogUtils.format("[binlog消费者初始化完成], consumer=<{0}>", consumer));
}
/**
* 启动消费者
*
* @param event
*/
@EventListener(ApplicationPreparedEvent.class)
public void startSysLogConsumer(ApplicationPreparedEvent event) {
try {
consumer.start();
} catch (MQClientException e) {
logger.error(LogUtils.format("[binlog消费者start异常], nameServerAddress={}"), e);
System.exit(-1);
}
}
@PreDestroy
public void closeConsumer() {
consumer.shutdown();
}
/**
* 解析binlog日志
* @param msg
*/
protected abstract void handlerMsg(MessageExt msg);
}
消费者实现类, 这里只把监听的消息, 做下日志打印
/**
* @author canxiusi.yan
* @description BinlogConsumer
* @date 2022/6/28 15:07
*/
@Component
public class BinlogConsumer extends AbstractBinLogConsumer {
/**
* 日志
*/
private static final Logger logger = LoggerFactory.getLogger(BinlogConsumer.class);
@Override
protected void handlerMsg(MessageExt msg) {
JSONObject jsonObject = JSONObject.parseObject(new String(msg.getBody()));
// 影响到几条sql, 就会有几条日志
logger.info(LogUtils.format("监听到binlog日志, json=<{0}>", jsonObject));
}
}
3.2 运行模拟
运行程序, 登录 rocketmq 控制台, 发现消费者已经注册成功
我这里使用 postman 调用新增数据的接口, 像 db 插入一条sql, 流程: 向db插入数据, 生成 binlog 日志, 把日志同步给伪装成了从节点的 canal, 然后把 binlog 日志发送给 mq, 之后代码中的消费者监听到数据, 观察日志打印, 可以发现, 已经监听到了消息
格式化消息后, 可以看到, 本次 db 变更的类型, 变更的数据, 都是能获取到的, 后面就可以做很多事情了, 比如把数据同步到 redis, es搜索引擎等
边栏推荐
- 技术分享 | 接口测试价值与体系
- 国元期货是正规平台吗?在国元期货开户安全吗?
- Li Kou brush question diary /day7/2022.6.29
- 神经网络物联网应用技术学什么
- 2022 ByteDance daily practice experience (Tiktok)
- Esp32-c3 introductory tutorial questions ⑫ - undefined reference to ROM_ temp_ to_ power, in function phy_ get_ romfunc_ addr
- 模板_判断素数_开方 / 六素数法
- 完善的js事件委托
- 力扣刷题日记/day6/6.28
- 【机器学习的数学基础】(一)线性代数(Linear Algebra)(上+)
猜你喜欢
. Net ORM framework hisql practice - Chapter 2 - using hisql to realize menu management (add, delete, modify and check)
Li Kou brush question diary /day3/2022.6.25
Process of manually encrypt the mass-producing firmware and programming ESP devices
每日一题(2022-07-02)——最低加油次数
Scala basic tutorial -- 19 -- actor
Behind the ultra clear image quality of NBA Live Broadcast: an in-depth interpretation of Alibaba cloud video cloud "narrowband HD 2.0" technology
Li Kou brush question diary /day7/6.30
Scala基础教程--18--集合(二)
基于unity的愤怒的小鸟设计
[release] a tool for testing WebService and database connection - dbtest v1.0
随机推荐
字节跳动Dev Better技术沙龙成功举办,携手华泰分享Web研发效能提升经验
Scala基础教程--20--Akka
力扣刷题日记/day3/2022.6.25
NBA赛事直播超清画质背后:阿里云视频云「窄带高清2.0」技术深度解读
Wireshark抓包TLS协议栏显示版本不一致问题
Halcon模板匹配
ByteDance dev better technology salon was successfully held, and we joined hands with Huatai to share our experience in improving the efficiency of web research and development
Wireshark packet capturing TLS protocol bar displays version inconsistency
【uniapp】uniapp开发app在线预览pdf文件
Scala基础教程--16--泛型
建立自己的网站(15)
Learning path PHP -- phpstudy "hosts file does not exist or is blocked from opening" when creating the project
Nature Microbiology | 可感染阿斯加德古菌的六种深海沉积物中的病毒基因组
Deleting nodes in binary search tree
从实时应用角度谈通信总线仲裁机制和网络流控
爬虫(6) - 网页数据解析(2) | BeautifulSoup4在爬虫中的使用
Using FTP
Process of manually encrypt the mass-producing firmware and programming ESP devices
Scala基础教程--17--集合
2022-07-04:以下go语言代码输出什么?A:true;B:false;C:编译错误。 package main import 'fmt' func