当前位置:网站首页>Use canal and rocketmq to listen to MySQL binlog logs
Use canal and rocketmq to listen to MySQL binlog logs
2022-07-04 19:17:00 【canxiusi】
One . Installation configuration canal
1.1 install canal
canal From Alibaba open source to github, Available on this page github Address , Download the compressed package of the development version , because github Download too slow , You can also find domestic resources by yourself

Then unzip , canal The file directory of is as follows

1.2 To configure canal Basic attributes
Enter into conf Folder , The directory structure is as follows

First, perform basic configuration , vim Command to enter canal.properties file , To configure canal Service ip Address , And the port number

After configuration canal Visual console , I didn't install it here , Therefore, it is not configured for the time being

Because this time is cooperation rocketMQ Use , So find in the following configuration canal.serverMode, And set to rocketMQ

after , Enter the key configuration , To configure rocketMQ Properties
rocketmq.producer.group : Producer groups , And specified in the code group It means the same thing
rocketmq.namesrv.addr : mq Of nameSvr Of ip Address
rocketmq.tag : When a producer produces a message , designated tag

1.3 To configure canal Of mysql
Get into conf Under the example Folder , Directory as follows , vim Order to enter instance.properties 
Several important configurations
canal.instance.mysql.slaveId : because canal It works by pretending to mysql Of slave, So it needs to be set here slave Of id
canal.instance.master.address : Database ip Address and port number

canal.instance.dbUsername : Login user name of the database to listen
canal.instance.dbPassword : Database login password

canal.instance.filter.regex : Database table to listen , Regular expression matching patterns , No configuration means all monitoring
canal.mq.topic : mq When the producer sends a message , designated topic, and java The meaning of the code is consistent

Two . mysql To configure
2.1 Turn on mysql Of binlog journal
because canal How it works , So it needs to be turned on mysql Of binlog journal , vim Command to edit etc Under the my.cnf file , stay mysqld Add the following configuration :
log-bin : To configure binlog Log file directory
binlog-format : To configure binlog Format of log file
server_id : To configure mysql The master node id, Cannot be in the cluster id, Or from the node id repeat , The id Unable to join canal Of slaveId repeat

Then check mysql binlog Open state of the , show variables like '%log_bin%', It can be seen that it is open 
2.2 To configure canal Dedicated users
stay canal In profile , I use root User logged in , So don't give mysql To configure canal user , Refer to other blogs
2.3 start-up canal
Enter into bin Folder , Directory as follows , Use command ./startup.sh Qi canal, Particular attention , If the server memory is insufficient , You can use it first vim edit startup.sh, Put... In the file java Use smaller memory settings for parameters

Then observe canal.log journal , You can see that the startup is successful

see rocketmq_client.log journal , Find out canal Always like mq Send heartbeat detection , And output mq Of group, example id Information

Sign in rocketMQ Visual console , You can see canal Sign up to mq Producer instance in , as well as topic Information , This information is similar to what we have done before canal The configuration in the configuration file is consistent


thus , About the server canal The configuration of is introduced , The following is used java Code implementation mq Information of consumers and producers
3、 ... and . java Code implementation
3.1 mq consumer
because canal The use of mq Pattern , He can be said to be a mq producer , So we need to define mq The consumer , The code is as follows , What consumers want to monitor topic and tag and canal The configuration in the configuration file is consistent
Consumer abstract class
/**
* @author canxiusi.yan
* @description AbstractProducer abstract SysLog Abstract parent class
* @date 2022/5/10 10:03
*/
public abstract class AbstractBinLogConsumer {
/**
* Subclass general log
*/
protected static final Logger logger = LoggerFactory.getLogger(AbstractBinLogConsumer.class);
private DefaultMQPushConsumer consumer;
private final static String GROUP_NAME = "canal-rocketmq-es%canal";
/**
* initialization mq consumer
*/
@PostConstruct
public void initConsumer() {
// The same business consumers are divided into the same group , You need to specify the same tag,
consumer = new DefaultMQPushConsumer(GROUP_NAME);
consumer.setNamesrvAddr("120.26.196.135:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setInstanceName("bin_log_consumer");
consumer.setPullInterval(1000);
// Set the number of messages pulled from the queue each time
consumer.setPullBatchSize(1000);
try {
consumer.subscribe("binlog_topic", "binlog_es_tag");
} catch (MQClientException e) {
logger.error("[binlog Consumer initialization exception ]", e);
System.exit(-1);
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
logger.info(LogUtils.format("[ The message list is empty ], msgs=<{0}>", msgs));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
handlerMsg(msg);
} catch (Exception e) {
logger.error("[ Log consumption processing exception ]", e);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
logger.info(LogUtils.format("[binlog Consumer initialization complete ], consumer=<{0}>", consumer));
}
/**
* Start consumer
*
* @param event
*/
@EventListener(ApplicationPreparedEvent.class)
public void startSysLogConsumer(ApplicationPreparedEvent event) {
try {
consumer.start();
} catch (MQClientException e) {
logger.error(LogUtils.format("[binlog consumer start abnormal ], nameServerAddress={}"), e);
System.exit(-1);
}
}
@PreDestroy
public void closeConsumer() {
consumer.shutdown();
}
/**
* analysis binlog journal
* @param msg
*/
protected abstract void handlerMsg(MessageExt msg);
}Consumer implementation class , Here only the monitored messages , Print the log
/**
* @author canxiusi.yan
* @description BinlogConsumer
* @date 2022/6/28 15:07
*/
@Component
public class BinlogConsumer extends AbstractBinLogConsumer {
/**
* journal
*/
private static final Logger logger = LoggerFactory.getLogger(BinlogConsumer.class);
@Override
protected void handlerMsg(MessageExt msg) {
JSONObject jsonObject = JSONObject.parseObject(new String(msg.getBody()));
// Affect several sql, There will be several logs
logger.info(LogUtils.format(" Listen to the binlog journal , json=<{0}>", jsonObject));
}
}3.2 Run simulation
Run the program , Sign in rocketmq Console , It is found that the consumer has successfully registered

I'm going to use postman Call the interface of new data , image db Insert a sql, technological process : towards db insert data , Generate binlog journal , Synchronize the log to the... Masquerading as a slave node canal, And then put binlog Log to mq, Then the consumer in the code listens to the data , Watch the log print , You can find , The message has been monitored

After formatting the message , You can see , This time db The type of change , Changed data , It's all available , Many things can be done later , For example, synchronize data to redis, es Search engines, etc

边栏推荐
- Scala basic tutorial -- 12 -- Reading and writing data
- Basic tutorial of scala -- 16 -- generics
- Li Kou brush question diary /day3/2022.6.25
- Installation and use of VMware Tools and open VM tools: solve the problems of incomplete screen and unable to transfer files of virtual machines
- 使用FTP
- Scala basic tutorial -- 15 -- recursion
- 读写关闭的channel是啥后果?
- 力扣刷题日记/day2/2022.6.24
- Nebula Importer 数据导入实践
- [mathematical modeling of graduate students in Jiangxi Province in 2022] analysis and code implementation of haze removal by nucleation of water vapor supersaturation
猜你喜欢

Scala basic tutorial -- 20 -- akka

LeetCode第300场周赛(20220703)

奥迪AUDI EDI INVOIC发票报文详解
![[release] a tool for testing WebService and database connection - dbtest v1.0](/img/4e/4154fec22035725d6c7aecd3371b05.jpg)
[release] a tool for testing WebService and database connection - dbtest v1.0
![[uniapp] uniapp development app online Preview PDF file](/img/11/d640338c626249057f7ad616b55c4f.png)
[uniapp] uniapp development app online Preview PDF file

基于unity的愤怒的小鸟设计

一、C语言入门基础

Wireshark抓包TLS协议栏显示版本不一致问题
![[mathematical modeling of graduate students in Jiangxi Province in 2022] analysis and code implementation of haze removal by nucleation of water vapor supersaturation](/img/da/d46cca19f34223d29003be2e33aaa4.png)
[mathematical modeling of graduate students in Jiangxi Province in 2022] analysis and code implementation of haze removal by nucleation of water vapor supersaturation

千万不要只学 Oracle、MySQL!
随机推荐
NBA赛事直播超清画质背后:阿里云视频云「窄带高清2.0」技术深度解读
2022CoCa: Contrastive Captioners are Image-Text Fountion Models
Scala基础教程--20--Akka
Wanghongru research group of Institute of genomics, Chinese Academy of Agricultural Sciences is cordially invited to join
从实时应用角度谈通信总线仲裁机制和网络流控
国元期货是正规平台吗?在国元期货开户安全吗?
[go language question brushing chapter] go conclusion chapter | introduction to functions, structures, interfaces, and errors
问下各位大佬有用过cdc直接mysql to clickhouse的么
启牛开的证券账户安全吗?
Angry bird design based on unity
How is the entered query SQL statement executed?
DeFi生态NFT流动性挖矿系统开发搭建
Li Kou brush question diary /day7/6.30
ThreadLocal原理与使用
Is Guoyuan futures a regular platform? Is it safe to open an account in Guoyuan futures?
字节跳动Dev Better技术沙龙成功举办,携手华泰分享Web研发效能提升经验
Caché JSON 使用JSON适配器
学习路之PHP--phpstudy创建项目时“hosts文件不存在或被阻止打开”
一种将Tree-LSTM的强化学习用于连接顺序选择的方法
PB的扩展DLL开发(超级篇)(七)