当前位置:网站首页>Did kafaka lose the message
Did kafaka lose the message
2022-07-28 12:38:00 【Zhang San admin】
A term is used to explain I believe that the partners who have done data processing are interested in kafka It must be familiar . The basis of kafka Knowledge is not enough here . Today I'm going to talk about kafka Several characteristics of , Let's briefly explain the meaning of these features :
Security :
Data from producer Write to the kafka as well as consumer from topic China consumption data , No data will be lost .
Idempotency :
The data is in kafka Will not be reproduced in the process , It will not be consumed repeatedly .
This is also the realization of exactly-once The basis of semantics .
Orderliness :
because kafka Sequential consumption , therefore kafka The orderliness of is mainly reflected in the producers' Writing kafka The order of data .
Use scenarios After introducing these definitions , Let's take a look at the usage scenarios of these features :
Security
I won't say much about security here , After all, as a message oriented middleware , Data loss is absolutely unacceptable . So the guarantee of data security is kafka Foundations that can be used .
Idempotency
Idempotency may not be of great significance for some businesses , But it is very important for some businesses . Take the capital settlement scenario , As the upstream, the business party prints the data to the settlement platform , If a piece of data is calculated 、 Dealt with many times , The consequences will be especially serious , At that time, the real gold and silver will be lost , And many other businesses may also be affected .
Besides kafka Idempotency of is also realized kafka exactly-once The basis of semantics , about kafka It's still very important for us .
Orderliness
Orderliness is of great significance in data updating and some business scenarios that depend on data order , Data pollution or business errors caused by data disorder are obviously unacceptable .
as everyone knows ,kafka It reads data in sequence , therefore kafka The orderliness of depends on the orderliness of the producer's production data .
Function realization
After understanding the usage scenarios of these features , So let's see kafka How to implement these features :
Security
kafka The data of is safe in most cases , But some extreme situations may still lead to kafka Lost data . For example, the following situation :

Let's talk about these scenes in detail :
producer In production data and receive ack You will encounter the following scenarios :
After sending the data , Ignore ack Directly confirm success : namely kafka Of at most once semantics . Producers believe that data production is successful , however broker Data processing failed . Cause data loss consumer In consumption data and commit offset You will encounter the following scenarios :
First commit offset, Then execute the business logic :commit success , Data processing failed . Cause data loss Execute the business logic first , Again commit offset:commit success , Asynchronous data processing failed . Cause data loss Another situation is kafka Security of own data , After some nodes are offline , Still able to provide external services , Ensure that data is not lost . however kafka broker Its own problems will also cause data “ The loss of ”, The loss here means that it is unable to provide external services , After all, data writing kafka And then stored on disk . Consider the following scenario :
kafka No backup of data , Some one broker After hanging up , The machine broker The data on will “ The loss of ” And unable to provide external services producer Data sent to broker after , The data is in leader If the node is written successfully, it will return ack success , stay leader towards replica When nodes synchronize data ,leader The node is down . cause producer The end believes that data production is successful , and broker End data “ The loss of ”
Obviously, for the above scenario ,kafka There are relevant plans . Let's start with production data security 、 Consumer data security and kafka Three scenarios of self storage data security illustrate this problem :
Producer data security
producer Of acks Modification of settings , Have a look first acks The value of :
acks by 0: It means producer After sending the data , Don't wait for broker confirm , Send the next data directly , The performance is the best, but there is a risk of data loss acks by 1: by 1 signify producer After sending the data , Need to wait leader After the copy is confirmed to be received , To send the next data , Poor performance , But the safety is greatly improved ( It is still possible to lose data , below broker Explain the data loss scenario in detail ) acks by -1: This represents all, It means that the message sent is written to all ISR Copies in the collection ( Note that not all copies ) after , To send the next data , The worst performance , But the security is the strongest , Make sure you don't lose data . From the above values, we can see ,producer If you want not to lose data ,acks Set from 0 At least change to 1, If you want to never lose data, you need to set it to -1
in addition ,producer The best mode to send messages is asynchronous , Can improve performance ; And handle some logic processing after sending through callback function , Such as printing logs or data statistics .
Consumer data security
consumer The main problem of losing data is enable.auto.commit This configuration item . The configuration of this configuration item is true Namely kafka Automatically commit offset; And set to false It's manual commit offset.
Automatically submit as long as the program goes wrong , Basically, there will be the problem of losing data . If manual submission is not handled well , There will also be the problem of losing data , Now let's analyze the difference between these two submission methods , And the correct use .
Automatic submission
stay Kafka The default submission mode of consumption displacement in is automatic submission . This is determined by the consumer client parameter enable.auto.commit To configure , The default value is true. This default auto submit is not submitted every time a message is consumed , It's a regular submission . This periodic cycle time is determined by the client parameter auto.commit.interval.ms To configure , The default value is 5 second . In addition, the premise for this parameter to take effect is enable.auto.commit Parameter is true. The action of automatic displacement submission is in poll() It's done in the logic of the method , Before each actual pull request to the server, it will check whether the displacement can be submitted , If possible , Then it will submit the displacement of the last round of consumption . Manual submission
Manual submission is divided into synchronous submission (commitSync) And asynchronous commit (commitAsync) Two ways .
The biggest problem with synchronous submission is when the data is processed in batches , When part of the data is consumed , I haven't submitted it yet offset It was interrupted , It will make the next consumption repeat the consumption of that part of the data that has been consumed . And synchronous commit will block threads , The overall performance is affected .
Asynchronous commit does not block threads , Compared with synchronous submission commitSync Better performance . But you need to deal with callback functions , It also needs to be in some extreme situations, such as rebalance There are corresponding treatments during operation .
Sum up , The conventional approach is to use asynchronous submission in the data consumption processing process commit offset; And the scenario of normal withdrawal of consumers , We can use ,commitSync Synchronous commit , Guarantee offset Right . Examples are as follows :
try {
while(isRunning.get()) {
//poll records and do some data processing .
consumer.commitAsync() ;
}
) finally {
try {
consumer.commitSync() ;
) finally {
consumer.close() ;
}}
and rebalance when offset Unable to submit normally , Leading to repeated consumption of data , In this scenario, we need to monitor rebalance Once before it happens offset Submit . Examples are as follows :
//currentOffsets You need to save the latest of the partition consumed by this consumer offset
// This code does not reflect , After the consumption data Update the object , And in rebalance Then empty the object
Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ;
consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () {
// It happened in rebalance Before , And when consumers stop reading messages
@Override
public void onPartitionsRevoked(Collection<TopicPart ition> partitions) {
consume.commitSync(currentOffsets) ;
currentOffsets.clear();
}
@Override
public void onPartitions Assigned(Collection<TopicPartition > partitions) {
//do nothing .
}
});
kafka Storage data security
kafka Because the data is stored on the hard disk , therefore kafka Its security is mainly due to the high availability of data , So for kafka The security of data is mainly related to the policy configuration during data backup and data synchronization .
First of all replication.factor Configuration parameters , This configuration determines the number of copies , The default is 1. Note that this parameter cannot exceed broker The number of . This parameter is actually because if you use the default 1, Or not creating topic Specify the number of copies when ( That is, the number of copies is 1), When a machine has disk damage or service downtime , Then the data will start from kafka It's missing . therefore replication.factor This parameter is preferably configured to be greater than 1, for instance 3. The second is the unavailability of data caused by data synchronization . such as leader Copy received data , But it hung up before syncing to other copies , At this time, the data is also lost . For this scenario, you can configure producer Terminal acks Set to -1 Can solve the problem , But such a cost will affect kafka Part of the performance and throughput . Last kafka There is a configuration parameter ,min.insync.replicas, The default is 1( It's just leader, The actual production should be increased ), This attribute specifies the minimum ISR Count . It means to be acks by -1( namely all) When , This parameter specifies what must be written ISR The number of copies in the set , If not , that producer There will be exceptions . The function of this parameter is leader After hanging up ,kakfa Can quickly from ISR Selected from leader, Restore service as quickly as possible .
Idempotency
Idempotency includes idempotency of production data and idempotency of consumption data . Let's explain from these two aspects :
producer Idempotency of the end
Idempotency producer kafka from 0.11 Support for producer Idempotency of the end , Idempotency can be achieved through the following configuration items producer The creation of :
props.put("enable.idempotence", true)
When idempotency is turned on ,acks It is automatically configured as “all” 了 , If at this time, manually set acks Set to 0, The program will report an error .
And idempotent producer The implementation logic is also relatively simple , namely Kafka Added pid and seq.Producer Each of them RecordBatch There is a monotonous increase in seq; Broker On each topic Will also maintain pid-seq Mapping , And every time Commit Will update lastSeq. such recordBatch When we arrive ,broker Will check RecordBatch Then save the data : If batch in baseSeq( First message seq) Than Broker Serial number of maintenance (lastSeq) Big 1, Then save the data , Otherwise do not save (inSequence Method ).
Although idempotent producer Solved part of the problem , But there are still two main shortcomings :
Idempotent producer Only idempotency on a single partition , That is, single partition messages are not repeated , Multiple partitions cannot guarantee idempotency . Only the idempotency of a single session can be maintained , Cannot achieve idempotency across sessions , That is to say if producer Hang up and restart , Idempotency between two sessions cannot be guaranteed ( New sessions may be retransmitted ). because broker The client cannot get the previous status information , Therefore, cross session idempotency cannot be realized . In this case ,kafka Provides transactional producer To solve the above problems .
transactional producer kafka The transaction introduces transactionId and Epoch, Set after starting transaction , One transactionId Only one pid, And Server The end will record the latest Epoch value .
There are new ones producer On initialization , Will send to TransactionCoordinator send out InitPIDRequest request ,TransactionCoordinator I already have this transactionId Corresponding meta, Will return to the previously assigned PID, And put Epoch Self increasing 1 return , So when old producer When recovering to request operation , Will be considered invalid producer Throw an exception .
If the transaction is not opened ,TransactionCoordinator For the new producer return new pid, So it doesn 't work , Therefore, multi session idempotent cannot be implemented .
The following is to turn on transactional producer Methods :
// Initialize transaction
producer.initTransactions();
try {
// Start a transaction
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
// Submit
producer.commitTransaction();
} catch (KafkaException e) {
// When something goes wrong , Terminate the transaction
producer.abortTransaction();
}
The above is for producer Implementation of end idempotence . But whether you turn on idempotence or the nature of transactions , Will have a certain impact on the performance , It's inevitable . therefore kafka These two features are not enabled by default , We also need to weigh whether we need to start according to the business .
consumer Idempotency of the end
It's a pity that ,kafka There is no guarantee that consumer Idempotent consumption measures , If you really need to guarantee consumer Idempotent of , A global can be maintained for each message id, Lock and remove weight every time you consume .
As for spending so many resources to achieve consumer Idempotency of the end , To realize kafka Of exactly once Is your consumption worth it , Then we have to weigh with the business .
The following is a piece of pseudo code :
if(cache.contain(msgId)){
// cache Contained in the msgId, Has dealt with
continue;
}else {
lock.lock();
cache.put(msgId,timeout);
commitSync();
lock.unLock();
}
// After all subsequent operations , Delete cache Medium msgId, as long as msgId There is cache in , I think I've dealt with .Note: Need to give cache Set up a message
Orderliness
First kafka The orderliness of can only guarantee the orderliness of single partition data , There is no guarantee of global order .
If it is really necessary to ensure that the data to be processed is in order , You can write the data that needs to be processed sequentially to the same partition for processing by rewriting the partition function . Partition selector needs to be implemented org.apache.kafka.clients.producer.Partitioner Interface . Then through the configuration item ProducerConfig.PARTITIONER_CLASS_CONFIG Make configuration assignments .
Back to the orderly implementation of single partition data . This is actually kafka A side effect of idempotency implementation . The new version kafka Set up enable.idempotence=true Can be adjusted dynamically after max-in-flight-request. This parameter specifies how many... The producer can send before receiving the response from the server batch news , The default value is 5.
When the retry request arrives ,batch Will be based on seq Re add to the appropriate location in the queue , And put max.in.flight.requests.per.connection Set to 1, So in front of it batch The serial number is smaller than it , Only the front ones are finished , It can send . In this way, at the expense of some performance and throughput , Ensure the order of data .
In the end, it's a long way to go , The road to big data is still a long way off . If you want to be a small partner of big data , Welcome to like, forward and pay attention , Don't get lost next time , We are moving forward together on the road of big data !
边栏推荐
- Brief discussion on open source OS distribution
- How does musk lay off staff?
- Latex matrix is simple to use
- 8000 字讲透 OBSA 原理与应用实践
- [apue] 文件中的空洞
- Tik tok "founder" Yang Luyu, farewell byte?
- The usage and Simulation Implementation of vector in STL
- Developing NES games with C language (cc65) 04. Complete background
- 软件架构师必需要了解的 saas 架构设计?
- Fastjson parses multi-level JSON strings
猜你喜欢

开源社区三十年 | 2022 开放原子全球开源峰会开源社区三十年专题活动圆满召开

Using Arduino to develop esp8266 to build a development environment

Yan Ji lost Beijing again, and more than half of the stores in the country were closed

Developing NES games with C language (cc65) 08. Background collision

分布式定时器

行业落地呈现新进展 | 2022 开放原子全球开源峰会 OpenAtom OpenHarmony 分论坛圆满召开

易观分析:以用户为中心提升手机银行用户体验,助力用户价值增长

Come to tdengine Developer Conference and have an insight into the future trend of data technology development

Top level "redis notes", cache avalanche + breakdown + penetration + cluster + distributed lock, Nb

Arduino Pro Mini atmega328p connect and light the first LED (at the same time, record the problem of burning failure stk500_recv)
随机推荐
PHP ⽉ the simplest way to add and subtract ⽅
Fusion cloud native, enabling new mileage | 2022 open atom global open source summit cloud native sub forum successfully held
SQL injection less23 (filter comment)
PHP日期时间运用:添加或减去特定日期的天数
【Try to Hack】内网基础
Knowledge points of MySQL (13)
php 日期计算操作处理,当前日期加一天和指定日期减一天
How can a novice quickly complete the establishment of a website? Come to the free "fitting room" experience
Open source database innovation in the era of digital economy | the 2022 open atom global open source summit database sub forum was successfully held
金山云冲刺港股拟双重主要上市:年营收90亿 为雷军力挺项目
Use json.stringify() to format data
Why do enterprises need the ability of enterprise knowledge management?
Using Arduino to develop esp8266 to build a development environment
Solve the PHP prompt warning: division by zero in error
新零售电商O2O模式解析
SQL injection less26 (filter spaces and comments, and use error injection without spaces)
If you don't roll the golden nine and silver ten, it's too late
What SaaS architecture design does a software architect need to know?
DIY system home page, your personalized needs PRO system to meet!
图书馆自动预约脚本