当前位置:网站首页>Use canal and rocketmq to listen to MySQL binlog logs
Use canal and rocketmq to listen to MySQL binlog logs
2022-07-04 19:17:00 【canxiusi】
One . Installation configuration canal
1.1 install canal
canal From Alibaba open source to github, Available on this page github Address , Download the compressed package of the development version , because github Download too slow , You can also find domestic resources by yourself

Then unzip , canal The file directory of is as follows

1.2 To configure canal Basic attributes
Enter into conf Folder , The directory structure is as follows

First, perform basic configuration , vim Command to enter canal.properties file , To configure canal Service ip Address , And the port number

After configuration canal Visual console , I didn't install it here , Therefore, it is not configured for the time being

Because this time is cooperation rocketMQ Use , So find in the following configuration canal.serverMode, And set to rocketMQ

after , Enter the key configuration , To configure rocketMQ Properties
rocketmq.producer.group : Producer groups , And specified in the code group It means the same thing
rocketmq.namesrv.addr : mq Of nameSvr Of ip Address
rocketmq.tag : When a producer produces a message , designated tag

1.3 To configure canal Of mysql
Get into conf Under the example Folder , Directory as follows , vim Order to enter instance.properties 
Several important configurations
canal.instance.mysql.slaveId : because canal It works by pretending to mysql Of slave, So it needs to be set here slave Of id
canal.instance.master.address : Database ip Address and port number

canal.instance.dbUsername : Login user name of the database to listen
canal.instance.dbPassword : Database login password

canal.instance.filter.regex : Database table to listen , Regular expression matching patterns , No configuration means all monitoring
canal.mq.topic : mq When the producer sends a message , designated topic, and java The meaning of the code is consistent

Two . mysql To configure
2.1 Turn on mysql Of binlog journal
because canal How it works , So it needs to be turned on mysql Of binlog journal , vim Command to edit etc Under the my.cnf file , stay mysqld Add the following configuration :
log-bin : To configure binlog Log file directory
binlog-format : To configure binlog Format of log file
server_id : To configure mysql The master node id, Cannot be in the cluster id, Or from the node id repeat , The id Unable to join canal Of slaveId repeat

Then check mysql binlog Open state of the , show variables like '%log_bin%', It can be seen that it is open 
2.2 To configure canal Dedicated users
stay canal In profile , I use root User logged in , So don't give mysql To configure canal user , Refer to other blogs
2.3 start-up canal
Enter into bin Folder , Directory as follows , Use command ./startup.sh Qi canal, Particular attention , If the server memory is insufficient , You can use it first vim edit startup.sh, Put... In the file java Use smaller memory settings for parameters

Then observe canal.log journal , You can see that the startup is successful

see rocketmq_client.log journal , Find out canal Always like mq Send heartbeat detection , And output mq Of group, example id Information

Sign in rocketMQ Visual console , You can see canal Sign up to mq Producer instance in , as well as topic Information , This information is similar to what we have done before canal The configuration in the configuration file is consistent


thus , About the server canal The configuration of is introduced , The following is used java Code implementation mq Information of consumers and producers
3、 ... and . java Code implementation
3.1 mq consumer
because canal The use of mq Pattern , He can be said to be a mq producer , So we need to define mq The consumer , The code is as follows , What consumers want to monitor topic and tag and canal The configuration in the configuration file is consistent
Consumer abstract class
/**
* @author canxiusi.yan
* @description AbstractProducer abstract SysLog Abstract parent class
* @date 2022/5/10 10:03
*/
public abstract class AbstractBinLogConsumer {
/**
* Subclass general log
*/
protected static final Logger logger = LoggerFactory.getLogger(AbstractBinLogConsumer.class);
private DefaultMQPushConsumer consumer;
private final static String GROUP_NAME = "canal-rocketmq-es%canal";
/**
* initialization mq consumer
*/
@PostConstruct
public void initConsumer() {
// The same business consumers are divided into the same group , You need to specify the same tag,
consumer = new DefaultMQPushConsumer(GROUP_NAME);
consumer.setNamesrvAddr("120.26.196.135:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setInstanceName("bin_log_consumer");
consumer.setPullInterval(1000);
// Set the number of messages pulled from the queue each time
consumer.setPullBatchSize(1000);
try {
consumer.subscribe("binlog_topic", "binlog_es_tag");
} catch (MQClientException e) {
logger.error("[binlog Consumer initialization exception ]", e);
System.exit(-1);
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
logger.info(LogUtils.format("[ The message list is empty ], msgs=<{0}>", msgs));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
handlerMsg(msg);
} catch (Exception e) {
logger.error("[ Log consumption processing exception ]", e);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
logger.info(LogUtils.format("[binlog Consumer initialization complete ], consumer=<{0}>", consumer));
}
/**
* Start consumer
*
* @param event
*/
@EventListener(ApplicationPreparedEvent.class)
public void startSysLogConsumer(ApplicationPreparedEvent event) {
try {
consumer.start();
} catch (MQClientException e) {
logger.error(LogUtils.format("[binlog consumer start abnormal ], nameServerAddress={}"), e);
System.exit(-1);
}
}
@PreDestroy
public void closeConsumer() {
consumer.shutdown();
}
/**
* analysis binlog journal
* @param msg
*/
protected abstract void handlerMsg(MessageExt msg);
}Consumer implementation class , Here only the monitored messages , Print the log
/**
* @author canxiusi.yan
* @description BinlogConsumer
* @date 2022/6/28 15:07
*/
@Component
public class BinlogConsumer extends AbstractBinLogConsumer {
/**
* journal
*/
private static final Logger logger = LoggerFactory.getLogger(BinlogConsumer.class);
@Override
protected void handlerMsg(MessageExt msg) {
JSONObject jsonObject = JSONObject.parseObject(new String(msg.getBody()));
// Affect several sql, There will be several logs
logger.info(LogUtils.format(" Listen to the binlog journal , json=<{0}>", jsonObject));
}
}3.2 Run simulation
Run the program , Sign in rocketmq Console , It is found that the consumer has successfully registered

I'm going to use postman Call the interface of new data , image db Insert a sql, technological process : towards db insert data , Generate binlog journal , Synchronize the log to the... Masquerading as a slave node canal, And then put binlog Log to mq, Then the consumer in the code listens to the data , Watch the log print , You can find , The message has been monitored

After formatting the message , You can see , This time db The type of change , Changed data , It's all available , Many things can be done later , For example, synchronize data to redis, es Search engines, etc

边栏推荐
- 物联网应用技术的就业前景和现状
- What types of Thawte wildcard SSL certificates provide
- Scala基础教程--16--泛型
- SSL证书续费相关问题详解
- 奥迪AUDI EDI INVOIC发票报文详解
- Li Kou brush question diary /day4/6.26
- 技术分享 | 接口测试价值与体系
- Nature Microbiology | 可感染阿斯加德古菌的六种深海沉积物中的病毒基因组
- Mxnet implementation of googlenet (parallel connection network)
- Learning path PHP -- phpstudy "hosts file does not exist or is blocked from opening" when creating the project
猜你喜欢

Halcon模板匹配

Scala基础教程--19--Actor

Wireshark packet capturing TLS protocol bar displays version inconsistency

读写关闭的channel是啥后果?

Wanghongru research group of Institute of genomics, Chinese Academy of Agricultural Sciences is cordially invited to join

Deleting nodes in binary search tree

Rookie post station management system based on C language

性能优化之关键渲染路径

Build your own website (15)

Scala basic tutorial -- 15 -- recursion
随机推荐
字节跳动Dev Better技术沙龙成功举办,携手华泰分享Web研发效能提升经验
Is Guoyuan futures a regular platform? Is it safe to open an account in Guoyuan futures?
Perfect JS event delegation
[mathematical basis of machine learning] (I) linear algebra (Part 1 +)
Send and receive IBM WebSphere MQ messages
How to modify icons in VBS or VBE
其他InterSystems %Net工具
Go microservice (II) - detailed introduction to protobuf
Build your own website (15)
Esp32-c3 introductory tutorial questions ⑫ - undefined reference to ROM_ temp_ to_ power, in function phy_ get_ romfunc_ addr
请教一下 flinksql中 除了数据统计结果是状态被保存 数据本身也是状态吗
Nature Microbiology | 可感染阿斯加德古菌的六种深海沉积物中的病毒基因组
Scala基础教程--16--泛型
Crawler (6) - Web page data parsing (2) | the use of beautifulsoup4 in Crawlers
Process of manually encrypt the mass-producing firmware and programming ESP devices
[opencv introduction to mastery 9] opencv video capture, image and video conversion
Li Kou brush question diary /day1/2022.6.23
读写关闭的channel是啥后果?
利用策略模式优化if代码【策略模式】
模板_判断素数_开方 / 六素数法