当前位置:网站首页>Use canal and rocketmq to listen to MySQL binlog logs
Use canal and rocketmq to listen to MySQL binlog logs
2022-07-04 19:17:00 【canxiusi】
One . Installation configuration canal
1.1 install canal
canal From Alibaba open source to github, Available on this page github Address , Download the compressed package of the development version , because github Download too slow , You can also find domestic resources by yourself
Then unzip , canal The file directory of is as follows
1.2 To configure canal Basic attributes
Enter into conf Folder , The directory structure is as follows
First, perform basic configuration , vim Command to enter canal.properties file , To configure canal Service ip Address , And the port number
After configuration canal Visual console , I didn't install it here , Therefore, it is not configured for the time being
Because this time is cooperation rocketMQ Use , So find in the following configuration canal.serverMode, And set to rocketMQ
after , Enter the key configuration , To configure rocketMQ Properties
rocketmq.producer.group : Producer groups , And specified in the code group It means the same thing
rocketmq.namesrv.addr : mq Of nameSvr Of ip Address
rocketmq.tag : When a producer produces a message , designated tag
1.3 To configure canal Of mysql
Get into conf Under the example Folder , Directory as follows , vim Order to enter instance.properties
Several important configurations
canal.instance.mysql.slaveId : because canal It works by pretending to mysql Of slave, So it needs to be set here slave Of id
canal.instance.master.address : Database ip Address and port number
canal.instance.dbUsername : Login user name of the database to listen
canal.instance.dbPassword : Database login password
canal.instance.filter.regex : Database table to listen , Regular expression matching patterns , No configuration means all monitoring
canal.mq.topic : mq When the producer sends a message , designated topic, and java The meaning of the code is consistent
Two . mysql To configure
2.1 Turn on mysql Of binlog journal
because canal How it works , So it needs to be turned on mysql Of binlog journal , vim Command to edit etc Under the my.cnf file , stay mysqld Add the following configuration :
log-bin : To configure binlog Log file directory
binlog-format : To configure binlog Format of log file
server_id : To configure mysql The master node id, Cannot be in the cluster id, Or from the node id repeat , The id Unable to join canal Of slaveId repeat
Then check mysql binlog Open state of the , show variables like '%log_bin%', It can be seen that it is open
2.2 To configure canal Dedicated users
stay canal In profile , I use root User logged in , So don't give mysql To configure canal user , Refer to other blogs
2.3 start-up canal
Enter into bin Folder , Directory as follows , Use command ./startup.sh Qi canal, Particular attention , If the server memory is insufficient , You can use it first vim edit startup.sh, Put... In the file java Use smaller memory settings for parameters
Then observe canal.log journal , You can see that the startup is successful
see rocketmq_client.log journal , Find out canal Always like mq Send heartbeat detection , And output mq Of group, example id Information
Sign in rocketMQ Visual console , You can see canal Sign up to mq Producer instance in , as well as topic Information , This information is similar to what we have done before canal The configuration in the configuration file is consistent
thus , About the server canal The configuration of is introduced , The following is used java Code implementation mq Information of consumers and producers
3、 ... and . java Code implementation
3.1 mq consumer
because canal The use of mq Pattern , He can be said to be a mq producer , So we need to define mq The consumer , The code is as follows , What consumers want to monitor topic and tag and canal The configuration in the configuration file is consistent
Consumer abstract class
/**
* @author canxiusi.yan
* @description AbstractProducer abstract SysLog Abstract parent class
* @date 2022/5/10 10:03
*/
public abstract class AbstractBinLogConsumer {
/**
* Subclass general log
*/
protected static final Logger logger = LoggerFactory.getLogger(AbstractBinLogConsumer.class);
private DefaultMQPushConsumer consumer;
private final static String GROUP_NAME = "canal-rocketmq-es%canal";
/**
* initialization mq consumer
*/
@PostConstruct
public void initConsumer() {
// The same business consumers are divided into the same group , You need to specify the same tag,
consumer = new DefaultMQPushConsumer(GROUP_NAME);
consumer.setNamesrvAddr("120.26.196.135:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setInstanceName("bin_log_consumer");
consumer.setPullInterval(1000);
// Set the number of messages pulled from the queue each time
consumer.setPullBatchSize(1000);
try {
consumer.subscribe("binlog_topic", "binlog_es_tag");
} catch (MQClientException e) {
logger.error("[binlog Consumer initialization exception ]", e);
System.exit(-1);
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
logger.info(LogUtils.format("[ The message list is empty ], msgs=<{0}>", msgs));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
handlerMsg(msg);
} catch (Exception e) {
logger.error("[ Log consumption processing exception ]", e);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
logger.info(LogUtils.format("[binlog Consumer initialization complete ], consumer=<{0}>", consumer));
}
/**
* Start consumer
*
* @param event
*/
@EventListener(ApplicationPreparedEvent.class)
public void startSysLogConsumer(ApplicationPreparedEvent event) {
try {
consumer.start();
} catch (MQClientException e) {
logger.error(LogUtils.format("[binlog consumer start abnormal ], nameServerAddress={}"), e);
System.exit(-1);
}
}
@PreDestroy
public void closeConsumer() {
consumer.shutdown();
}
/**
* analysis binlog journal
* @param msg
*/
protected abstract void handlerMsg(MessageExt msg);
}
Consumer implementation class , Here only the monitored messages , Print the log
/**
* @author canxiusi.yan
* @description BinlogConsumer
* @date 2022/6/28 15:07
*/
@Component
public class BinlogConsumer extends AbstractBinLogConsumer {
/**
* journal
*/
private static final Logger logger = LoggerFactory.getLogger(BinlogConsumer.class);
@Override
protected void handlerMsg(MessageExt msg) {
JSONObject jsonObject = JSONObject.parseObject(new String(msg.getBody()));
// Affect several sql, There will be several logs
logger.info(LogUtils.format(" Listen to the binlog journal , json=<{0}>", jsonObject));
}
}
3.2 Run simulation
Run the program , Sign in rocketmq Console , It is found that the consumer has successfully registered
I'm going to use postman Call the interface of new data , image db Insert a sql, technological process : towards db insert data , Generate binlog journal , Synchronize the log to the... Masquerading as a slave node canal, And then put binlog Log to mq, Then the consumer in the code listens to the data , Watch the log print , You can find , The message has been monitored
After formatting the message , You can see , This time db The type of change , Changed data , It's all available , Many things can be done later , For example, synchronize data to redis, es Search engines, etc
边栏推荐
- Scala基础教程--19--Actor
- Li Kou brush question diary /day1/2022.6.23
- 激进技术派 vs 项目保守派的微服务架构之争
- 大佬们,求助一下,我用mysql cdc 2.2.1(flink 1.14.5)写入kafka,设置
- 2022-07-04:以下go语言代码输出什么?A:true;B:false;C:编译错误。 package main import 'fmt' func
- Scala basic tutorial -- 19 -- actor
- Principle and application of ThreadLocal
- IBM WebSphere MQ retrieving messages
- Esp32-c3 introductory tutorial questions ⑫ - undefined reference to ROM_ temp_ to_ power, in function phy_ get_ romfunc_ addr
- PB的扩展DLL开发(超级篇)(七)
猜你喜欢
神经网络物联网应用技术学什么
. Net ORM framework hisql practice - Chapter 2 - using hisql to realize menu management (add, delete, modify and check)
Nature Microbiology | 可感染阿斯加德古菌的六种深海沉积物中的病毒基因组
NBA赛事直播超清画质背后:阿里云视频云「窄带高清2.0」技术深度解读
Li Kou brush question diary /day1/2022.6.23
基于unity的愤怒的小鸟设计
Nebula Importer 数据导入实践
Scala basic tutorial -- 13 -- advanced function
从实时应用角度谈通信总线仲裁机制和网络流控
Li Kou brush question diary /day2/2022.6.24
随机推荐
整理混乱的头文件,我用include what you use
模板_判断素数_开方 / 六素数法
Uni app and uviewui realize the imitation of Xiaomi mall app (with source code)
国元期货是正规平台吗?在国元期货开户安全吗?
Angry bird design based on unity
基于unity的愤怒的小鸟设计
英特尔集成光电研究最新进展推动共封装光学和光互连技术进步
ThreadLocal原理与使用
激进技术派 vs 项目保守派的微服务架构之争
Mxnet implementation of googlenet (parallel connection network)
奥迪AUDI EDI INVOIC发票报文详解
What if the self incrementing ID of online MySQL is exhausted?
Build your own website (15)
Scala basic tutorial -- 13 -- advanced function
Crawler (6) - Web page data parsing (2) | the use of beautifulsoup4 in Crawlers
IBM WebSphere MQ检索邮件
中国农科院基因组所汪鸿儒课题组诚邀加入
一、C语言入门基础
MXNet对GoogLeNet的实现(并行连结网络)
php伪原创api对接方法