当前位置:网站首页>Did kafaka lose the message
Did kafaka lose the message
2022-07-28 12:38:00 【Zhang San admin】
A term is used to explain I believe that the partners who have done data processing are interested in kafka It must be familiar . The basis of kafka Knowledge is not enough here . Today I'm going to talk about kafka Several characteristics of , Let's briefly explain the meaning of these features :
Security :
Data from producer Write to the kafka as well as consumer from topic China consumption data , No data will be lost .
Idempotency :
The data is in kafka Will not be reproduced in the process , It will not be consumed repeatedly .
This is also the realization of exactly-once The basis of semantics .
Orderliness :
because kafka Sequential consumption , therefore kafka The orderliness of is mainly reflected in the producers' Writing kafka The order of data .
Use scenarios After introducing these definitions , Let's take a look at the usage scenarios of these features :
Security
I won't say much about security here , After all, as a message oriented middleware , Data loss is absolutely unacceptable . So the guarantee of data security is kafka Foundations that can be used .
Idempotency
Idempotency may not be of great significance for some businesses , But it is very important for some businesses . Take the capital settlement scenario , As the upstream, the business party prints the data to the settlement platform , If a piece of data is calculated 、 Dealt with many times , The consequences will be especially serious , At that time, the real gold and silver will be lost , And many other businesses may also be affected .
Besides kafka Idempotency of is also realized kafka exactly-once The basis of semantics , about kafka It's still very important for us .
Orderliness
Orderliness is of great significance in data updating and some business scenarios that depend on data order , Data pollution or business errors caused by data disorder are obviously unacceptable .
as everyone knows ,kafka It reads data in sequence , therefore kafka The orderliness of depends on the orderliness of the producer's production data .
Function realization
After understanding the usage scenarios of these features , So let's see kafka How to implement these features :
Security
kafka The data of is safe in most cases , But some extreme situations may still lead to kafka Lost data . For example, the following situation :

Let's talk about these scenes in detail :
producer In production data and receive ack You will encounter the following scenarios :
After sending the data , Ignore ack Directly confirm success : namely kafka Of at most once semantics . Producers believe that data production is successful , however broker Data processing failed . Cause data loss consumer In consumption data and commit offset You will encounter the following scenarios :
First commit offset, Then execute the business logic :commit success , Data processing failed . Cause data loss Execute the business logic first , Again commit offset:commit success , Asynchronous data processing failed . Cause data loss Another situation is kafka Security of own data , After some nodes are offline , Still able to provide external services , Ensure that data is not lost . however kafka broker Its own problems will also cause data “ The loss of ”, The loss here means that it is unable to provide external services , After all, data writing kafka And then stored on disk . Consider the following scenario :
kafka No backup of data , Some one broker After hanging up , The machine broker The data on will “ The loss of ” And unable to provide external services producer Data sent to broker after , The data is in leader If the node is written successfully, it will return ack success , stay leader towards replica When nodes synchronize data ,leader The node is down . cause producer The end believes that data production is successful , and broker End data “ The loss of ”
Obviously, for the above scenario ,kafka There are relevant plans . Let's start with production data security 、 Consumer data security and kafka Three scenarios of self storage data security illustrate this problem :
Producer data security
producer Of acks Modification of settings , Have a look first acks The value of :
acks by 0: It means producer After sending the data , Don't wait for broker confirm , Send the next data directly , The performance is the best, but there is a risk of data loss acks by 1: by 1 signify producer After sending the data , Need to wait leader After the copy is confirmed to be received , To send the next data , Poor performance , But the safety is greatly improved ( It is still possible to lose data , below broker Explain the data loss scenario in detail ) acks by -1: This represents all, It means that the message sent is written to all ISR Copies in the collection ( Note that not all copies ) after , To send the next data , The worst performance , But the security is the strongest , Make sure you don't lose data . From the above values, we can see ,producer If you want not to lose data ,acks Set from 0 At least change to 1, If you want to never lose data, you need to set it to -1
in addition ,producer The best mode to send messages is asynchronous , Can improve performance ; And handle some logic processing after sending through callback function , Such as printing logs or data statistics .
Consumer data security
consumer The main problem of losing data is enable.auto.commit This configuration item . The configuration of this configuration item is true Namely kafka Automatically commit offset; And set to false It's manual commit offset.
Automatically submit as long as the program goes wrong , Basically, there will be the problem of losing data . If manual submission is not handled well , There will also be the problem of losing data , Now let's analyze the difference between these two submission methods , And the correct use .
Automatic submission
stay Kafka The default submission mode of consumption displacement in is automatic submission . This is determined by the consumer client parameter enable.auto.commit To configure , The default value is true. This default auto submit is not submitted every time a message is consumed , It's a regular submission . This periodic cycle time is determined by the client parameter auto.commit.interval.ms To configure , The default value is 5 second . In addition, the premise for this parameter to take effect is enable.auto.commit Parameter is true. The action of automatic displacement submission is in poll() It's done in the logic of the method , Before each actual pull request to the server, it will check whether the displacement can be submitted , If possible , Then it will submit the displacement of the last round of consumption . Manual submission
Manual submission is divided into synchronous submission (commitSync) And asynchronous commit (commitAsync) Two ways .
The biggest problem with synchronous submission is when the data is processed in batches , When part of the data is consumed , I haven't submitted it yet offset It was interrupted , It will make the next consumption repeat the consumption of that part of the data that has been consumed . And synchronous commit will block threads , The overall performance is affected .
Asynchronous commit does not block threads , Compared with synchronous submission commitSync Better performance . But you need to deal with callback functions , It also needs to be in some extreme situations, such as rebalance There are corresponding treatments during operation .
Sum up , The conventional approach is to use asynchronous submission in the data consumption processing process commit offset; And the scenario of normal withdrawal of consumers , We can use ,commitSync Synchronous commit , Guarantee offset Right . Examples are as follows :
try {
while(isRunning.get()) {
//poll records and do some data processing .
consumer.commitAsync() ;
}
) finally {
try {
consumer.commitSync() ;
) finally {
consumer.close() ;
}}
and rebalance when offset Unable to submit normally , Leading to repeated consumption of data , In this scenario, we need to monitor rebalance Once before it happens offset Submit . Examples are as follows :
//currentOffsets You need to save the latest of the partition consumed by this consumer offset
// This code does not reflect , After the consumption data Update the object , And in rebalance Then empty the object
Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ;
consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () {
// It happened in rebalance Before , And when consumers stop reading messages
@Override
public void onPartitionsRevoked(Collection<TopicPart ition> partitions) {
consume.commitSync(currentOffsets) ;
currentOffsets.clear();
}
@Override
public void onPartitions Assigned(Collection<TopicPartition > partitions) {
//do nothing .
}
});
kafka Storage data security
kafka Because the data is stored on the hard disk , therefore kafka Its security is mainly due to the high availability of data , So for kafka The security of data is mainly related to the policy configuration during data backup and data synchronization .
First of all replication.factor Configuration parameters , This configuration determines the number of copies , The default is 1. Note that this parameter cannot exceed broker The number of . This parameter is actually because if you use the default 1, Or not creating topic Specify the number of copies when ( That is, the number of copies is 1), When a machine has disk damage or service downtime , Then the data will start from kafka It's missing . therefore replication.factor This parameter is preferably configured to be greater than 1, for instance 3. The second is the unavailability of data caused by data synchronization . such as leader Copy received data , But it hung up before syncing to other copies , At this time, the data is also lost . For this scenario, you can configure producer Terminal acks Set to -1 Can solve the problem , But such a cost will affect kafka Part of the performance and throughput . Last kafka There is a configuration parameter ,min.insync.replicas, The default is 1( It's just leader, The actual production should be increased ), This attribute specifies the minimum ISR Count . It means to be acks by -1( namely all) When , This parameter specifies what must be written ISR The number of copies in the set , If not , that producer There will be exceptions . The function of this parameter is leader After hanging up ,kakfa Can quickly from ISR Selected from leader, Restore service as quickly as possible .
Idempotency
Idempotency includes idempotency of production data and idempotency of consumption data . Let's explain from these two aspects :
producer Idempotency of the end
Idempotency producer kafka from 0.11 Support for producer Idempotency of the end , Idempotency can be achieved through the following configuration items producer The creation of :
props.put("enable.idempotence", true)
When idempotency is turned on ,acks It is automatically configured as “all” 了 , If at this time, manually set acks Set to 0, The program will report an error .
And idempotent producer The implementation logic is also relatively simple , namely Kafka Added pid and seq.Producer Each of them RecordBatch There is a monotonous increase in seq; Broker On each topic Will also maintain pid-seq Mapping , And every time Commit Will update lastSeq. such recordBatch When we arrive ,broker Will check RecordBatch Then save the data : If batch in baseSeq( First message seq) Than Broker Serial number of maintenance (lastSeq) Big 1, Then save the data , Otherwise do not save (inSequence Method ).
Although idempotent producer Solved part of the problem , But there are still two main shortcomings :
Idempotent producer Only idempotency on a single partition , That is, single partition messages are not repeated , Multiple partitions cannot guarantee idempotency . Only the idempotency of a single session can be maintained , Cannot achieve idempotency across sessions , That is to say if producer Hang up and restart , Idempotency between two sessions cannot be guaranteed ( New sessions may be retransmitted ). because broker The client cannot get the previous status information , Therefore, cross session idempotency cannot be realized . In this case ,kafka Provides transactional producer To solve the above problems .
transactional producer kafka The transaction introduces transactionId and Epoch, Set after starting transaction , One transactionId Only one pid, And Server The end will record the latest Epoch value .
There are new ones producer On initialization , Will send to TransactionCoordinator send out InitPIDRequest request ,TransactionCoordinator I already have this transactionId Corresponding meta, Will return to the previously assigned PID, And put Epoch Self increasing 1 return , So when old producer When recovering to request operation , Will be considered invalid producer Throw an exception .
If the transaction is not opened ,TransactionCoordinator For the new producer return new pid, So it doesn 't work , Therefore, multi session idempotent cannot be implemented .
The following is to turn on transactional producer Methods :
// Initialize transaction
producer.initTransactions();
try {
// Start a transaction
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
// Submit
producer.commitTransaction();
} catch (KafkaException e) {
// When something goes wrong , Terminate the transaction
producer.abortTransaction();
}
The above is for producer Implementation of end idempotence . But whether you turn on idempotence or the nature of transactions , Will have a certain impact on the performance , It's inevitable . therefore kafka These two features are not enabled by default , We also need to weigh whether we need to start according to the business .
consumer Idempotency of the end
It's a pity that ,kafka There is no guarantee that consumer Idempotent consumption measures , If you really need to guarantee consumer Idempotent of , A global can be maintained for each message id, Lock and remove weight every time you consume .
As for spending so many resources to achieve consumer Idempotency of the end , To realize kafka Of exactly once Is your consumption worth it , Then we have to weigh with the business .
The following is a piece of pseudo code :
if(cache.contain(msgId)){
// cache Contained in the msgId, Has dealt with
continue;
}else {
lock.lock();
cache.put(msgId,timeout);
commitSync();
lock.unLock();
}
// After all subsequent operations , Delete cache Medium msgId, as long as msgId There is cache in , I think I've dealt with .Note: Need to give cache Set up a message
Orderliness
First kafka The orderliness of can only guarantee the orderliness of single partition data , There is no guarantee of global order .
If it is really necessary to ensure that the data to be processed is in order , You can write the data that needs to be processed sequentially to the same partition for processing by rewriting the partition function . Partition selector needs to be implemented org.apache.kafka.clients.producer.Partitioner Interface . Then through the configuration item ProducerConfig.PARTITIONER_CLASS_CONFIG Make configuration assignments .
Back to the orderly implementation of single partition data . This is actually kafka A side effect of idempotency implementation . The new version kafka Set up enable.idempotence=true Can be adjusted dynamically after max-in-flight-request. This parameter specifies how many... The producer can send before receiving the response from the server batch news , The default value is 5.
When the retry request arrives ,batch Will be based on seq Re add to the appropriate location in the queue , And put max.in.flight.requests.per.connection Set to 1, So in front of it batch The serial number is smaller than it , Only the front ones are finished , It can send . In this way, at the expense of some performance and throughput , Ensure the order of data .
In the end, it's a long way to go , The road to big data is still a long way off . If you want to be a small partner of big data , Welcome to like, forward and pay attention , Don't get lost next time , We are moving forward together on the road of big data !
边栏推荐
- Distributed timer
- Laravel之缓存
- SuperMap arsurvey license module division
- SQL注入 Less23(过滤注释符)
- PHP日期时间运用:添加或减去特定日期的天数
- [try to hack] UDF raises rights
- Brief discussion on open source OS distribution
- Develop NES game (cc65) 07 and controller with C language (collision with spirit)
- Industry, University, research and application jointly build an open source talent ecosystem | the 2022 open atom global open source summit education sub forum was successfully held
- Developing NES games with C language (cc65) 10. Game cycle
猜你喜欢

软件架构师必需要了解的 saas 架构设计?

顶级“Redis笔记”,缓存雪崩+击穿+穿透+集群+分布式锁,NB了

社区点赞业务缓存设计优化探索

Great! Jd.com developed the highly available website construction technology PDF recommended by the first brother. Prepare the water and chew it slowly

arduino pro mini ATMEGA328P 连线和点亮第一盏LED(同时记录烧录失败的问题stk500_recv)

Zadig v1.13.0 believes in the power of openness, and workflow connects all values

leetcode:704二分查找

设计一个线程池

用C语言开发NES游戏(CC65)10、游戏循环

行业落地呈现新进展 | 2022 开放原子全球开源峰会 OpenAtom OpenHarmony 分论坛圆满召开
随机推荐
牛客网二叉树题解
PHP date time application: add or subtract the number of days of a specific date
Cache of laravel
Great! Jd.com developed the highly available website construction technology PDF recommended by the first brother. Prepare the water and chew it slowly
[dark horse morning post] LETV 400 employees have no 996 and no internal papers; Witness history! 1 euro =1 US dollar; Stop immediately when these two interfaces appear on wechat; The crackdown on cou
Knowledge points of MySQL (13)
【萌新解题】爬楼梯
解决PHP提示Warning: Division by zero in错误
产学研用 共建开源人才生态 | 2022 开放原子全球开源峰会教育分论坛圆满召开
数字经济时代的开源数据库创新 | 2022 开放原子全球开源峰会数据库分论坛圆满召开
用C语言开发NES游戏(CC65)02、什么是v-blank?
PHP timestamp subtraction converts to days, hours, minutes and seconds
HMS core audio editing service supports 7 kinds of audio effects to help one-stop audio processing
Localization, low latency, green and low carbon: Alibaba cloud officially launched Fuzhou data center
How to build knowledge management system in enterprises and institutions
用C语言开发NES游戏(CC65)10、游戏循环
聚变云原生,赋能新里程 | 2022 开放原子全球开源峰会云原生分论坛圆满召开
Tik tok "founder" Yang Luyu, farewell byte?
用arduino开发ESP8266 搭建开发环境
社区点赞业务缓存设计优化探索