当前位置:网站首页>Did kafaka lose the message

Did kafaka lose the message

2022-07-28 12:38:00 Zhang San admin

A term is used to explain I believe that the partners who have done data processing are interested in kafka It must be familiar . The basis of kafka Knowledge is not enough here . Today I'm going to talk about kafka Several characteristics of , Let's briefly explain the meaning of these features :

Security :

Data from producer Write to the kafka as well as consumer from topic China consumption data , No data will be lost .

Idempotency :

The data is in kafka Will not be reproduced in the process , It will not be consumed repeatedly .

This is also the realization of exactly-once The basis of semantics .

Orderliness :

because kafka Sequential consumption , therefore kafka The orderliness of is mainly reflected in the producers' Writing kafka The order of data .

Use scenarios After introducing these definitions , Let's take a look at the usage scenarios of these features :

Security

I won't say much about security here , After all, as a message oriented middleware , Data loss is absolutely unacceptable . So the guarantee of data security is kafka Foundations that can be used .

Idempotency

Idempotency may not be of great significance for some businesses , But it is very important for some businesses . Take the capital settlement scenario , As the upstream, the business party prints the data to the settlement platform , If a piece of data is calculated 、 Dealt with many times , The consequences will be especially serious , At that time, the real gold and silver will be lost , And many other businesses may also be affected .

Besides kafka Idempotency of is also realized kafka exactly-once The basis of semantics , about kafka It's still very important for us .

Orderliness

Orderliness is of great significance in data updating and some business scenarios that depend on data order , Data pollution or business errors caused by data disorder are obviously unacceptable .

as everyone knows ,kafka It reads data in sequence , therefore kafka The orderliness of depends on the orderliness of the producer's production data .

Function realization

After understanding the usage scenarios of these features , So let's see kafka How to implement these features :

Security

kafka The data of is safe in most cases , But some extreme situations may still lead to kafka Lost data . For example, the following situation :

Let's talk about these scenes in detail :

producer In production data and receive ack You will encounter the following scenarios :

  • After sending the data , Ignore ack Directly confirm success : namely kafka Of at most once semantics . Producers believe that data production is successful , however broker Data processing failed . Cause data loss consumer In consumption data and commit offset You will encounter the following scenarios :

  • First commit offset, Then execute the business logic :commit success , Data processing failed . Cause data loss Execute the business logic first , Again commit offset:commit success , Asynchronous data processing failed . Cause data loss Another situation is kafka Security of own data , After some nodes are offline , Still able to provide external services , Ensure that data is not lost . however kafka broker Its own problems will also cause data “ The loss of ”, The loss here means that it is unable to provide external services , After all, data writing kafka And then stored on disk . Consider the following scenario :

  • kafka No backup of data , Some one broker After hanging up , The machine broker The data on will “ The loss of ” And unable to provide external services producer Data sent to broker after , The data is in leader If the node is written successfully, it will return ack success , stay leader towards replica When nodes synchronize data ,leader The node is down . cause producer The end believes that data production is successful , and broker End data “ The loss of ”

Obviously, for the above scenario ,kafka There are relevant plans . Let's start with production data security 、 Consumer data security and kafka Three scenarios of self storage data security illustrate this problem :

Producer data security

producer Of acks Modification of settings , Have a look first acks The value of :

  • acks by 0: It means producer After sending the data , Don't wait for broker confirm , Send the next data directly , The performance is the best, but there is a risk of data loss
  • acks by 1: by 1 signify producer After sending the data , Need to wait leader After the copy is confirmed to be received , To send the next data , Poor performance , But the safety is greatly improved ( It is still possible to lose data , below broker Explain the data loss scenario in detail )
  • acks by -1: This represents all, It means that the message sent is written to all ISR Copies in the collection ( Note that not all copies ) after , To send the next data , The worst performance , But the security is the strongest , Make sure you don't lose data . From the above values, we can see ,producer If you want not to lose data ,acks Set from 0 At least change to 1, If you want to never lose data, you need to set it to -1

in addition ,producer The best mode to send messages is asynchronous , Can improve performance ; And handle some logic processing after sending through callback function , Such as printing logs or data statistics .

Consumer data security

consumer The main problem of losing data is enable.auto.commit This configuration item . The configuration of this configuration item is true Namely kafka Automatically commit offset; And set to false It's manual commit offset.

Automatically submit as long as the program goes wrong , Basically, there will be the problem of losing data . If manual submission is not handled well , There will also be the problem of losing data , Now let's analyze the difference between these two submission methods , And the correct use .

Automatic submission

stay Kafka The default submission mode of consumption displacement in is automatic submission . This is determined by the consumer client parameter enable.auto.commit To configure , The default value is true. This default auto submit is not submitted every time a message is consumed , It's a regular submission . This periodic cycle time is determined by the client parameter auto.commit.interval.ms To configure , The default value is 5 second . In addition, the premise for this parameter to take effect is enable.auto.commit Parameter is true. The action of automatic displacement submission is in poll() It's done in the logic of the method , Before each actual pull request to the server, it will check whether the displacement can be submitted , If possible , Then it will submit the displacement of the last round of consumption . Manual submission

Manual submission is divided into synchronous submission (commitSync) And asynchronous commit (commitAsync) Two ways .

The biggest problem with synchronous submission is when the data is processed in batches , When part of the data is consumed , I haven't submitted it yet offset It was interrupted , It will make the next consumption repeat the consumption of that part of the data that has been consumed . And synchronous commit will block threads , The overall performance is affected .

Asynchronous commit does not block threads , Compared with synchronous submission commitSync Better performance . But you need to deal with callback functions , It also needs to be in some extreme situations, such as rebalance There are corresponding treatments during operation .

Sum up , The conventional approach is to use asynchronous submission in the data consumption processing process commit offset; And the scenario of normal withdrawal of consumers , We can use ,commitSync Synchronous commit , Guarantee offset Right . Examples are as follows :

try {
    while(isRunning.get()) {
        //poll records and do some data processing .
        consumer.commitAsync() ;
    }
) finally {
    try {
        consumer.commitSync() ;
    ) finally {
        consumer.close() ;
}}

and rebalance when offset Unable to submit normally , Leading to repeated consumption of data , In this scenario, we need to monitor rebalance Once before it happens offset Submit . Examples are as follows :

//currentOffsets You need to save the latest of the partition consumed by this consumer offset
// This code does not reflect , After the consumption data   Update the object , And in rebalance Then empty the object
Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ;

consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () {
    // It happened in rebalance Before , And when consumers stop reading messages
    @Override
    public void onPartitionsRevoked(Collection<TopicPart ition> partitions) {
        consume.commitSync(currentOffsets) ;
        currentOffsets.clear();
    }

    @Override
     public void onPartitions Assigned(Collection<TopicPartition > partitions) {
        //do nothing .
    }
});

kafka Storage data security

kafka Because the data is stored on the hard disk , therefore kafka Its security is mainly due to the high availability of data , So for kafka The security of data is mainly related to the policy configuration during data backup and data synchronization .

First of all replication.factor Configuration parameters , This configuration determines the number of copies , The default is 1. Note that this parameter cannot exceed broker The number of . This parameter is actually because if you use the default 1, Or not creating topic Specify the number of copies when ( That is, the number of copies is 1), When a machine has disk damage or service downtime , Then the data will start from kafka It's missing . therefore replication.factor This parameter is preferably configured to be greater than 1, for instance 3. The second is the unavailability of data caused by data synchronization . such as leader Copy received data , But it hung up before syncing to other copies , At this time, the data is also lost . For this scenario, you can configure producer Terminal acks Set to -1 Can solve the problem , But such a cost will affect kafka Part of the performance and throughput . Last kafka There is a configuration parameter ,min.insync.replicas, The default is 1( It's just leader, The actual production should be increased ), This attribute specifies the minimum ISR Count . It means to be acks by -1( namely all) When , This parameter specifies what must be written ISR The number of copies in the set , If not , that producer There will be exceptions . The function of this parameter is leader After hanging up ,kakfa Can quickly from ISR Selected from leader, Restore service as quickly as possible .

Idempotency

Idempotency includes idempotency of production data and idempotency of consumption data . Let's explain from these two aspects :

producer Idempotency of the end

Idempotency producer kafka from 0.11 Support for producer Idempotency of the end , Idempotency can be achieved through the following configuration items producer The creation of :

props.put("enable.idempotence"true)

When idempotency is turned on ,acks It is automatically configured as “all” 了 , If at this time, manually set acks Set to 0, The program will report an error .

And idempotent producer The implementation logic is also relatively simple , namely Kafka Added pid and seq.Producer Each of them RecordBatch There is a monotonous increase in seq; Broker On each topic Will also maintain pid-seq Mapping , And every time Commit Will update lastSeq. such recordBatch When we arrive ,broker Will check RecordBatch Then save the data : If batch in baseSeq( First message seq) Than Broker Serial number of maintenance (lastSeq) Big 1, Then save the data , Otherwise do not save (inSequence Method ).

Although idempotent producer Solved part of the problem , But there are still two main shortcomings :

Idempotent producer Only idempotency on a single partition , That is, single partition messages are not repeated , Multiple partitions cannot guarantee idempotency . Only the idempotency of a single session can be maintained , Cannot achieve idempotency across sessions , That is to say if producer Hang up and restart , Idempotency between two sessions cannot be guaranteed ( New sessions may be retransmitted ). because broker The client cannot get the previous status information , Therefore, cross session idempotency cannot be realized . In this case ,kafka Provides transactional producer To solve the above problems .

transactional producer kafka The transaction introduces transactionId and Epoch, Set after starting transaction , One transactionId Only one pid, And Server The end will record the latest Epoch value .

There are new ones producer On initialization , Will send to TransactionCoordinator send out InitPIDRequest request ,TransactionCoordinator I already have this transactionId Corresponding meta, Will return to the previously assigned PID, And put Epoch Self increasing 1 return , So when old producer When recovering to request operation , Will be considered invalid producer Throw an exception .

If the transaction is not opened ,TransactionCoordinator For the new producer return new pid, So it doesn 't work , Therefore, multi session idempotent cannot be implemented .

The following is to turn on transactional producer Methods :

// Initialize transaction 
producer.initTransactions();
try {
    // Start a transaction
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    // Submit
    producer.commitTransaction();
} catch (KafkaException e) {
    // When something goes wrong , Terminate the transaction
    producer.abortTransaction();
}

The above is for producer Implementation of end idempotence . But whether you turn on idempotence or the nature of transactions , Will have a certain impact on the performance , It's inevitable . therefore kafka These two features are not enabled by default , We also need to weigh whether we need to start according to the business .

consumer Idempotency of the end

It's a pity that ,kafka There is no guarantee that consumer Idempotent consumption measures , If you really need to guarantee consumer Idempotent of , A global can be maintained for each message id, Lock and remove weight every time you consume .

As for spending so many resources to achieve consumer Idempotency of the end , To realize kafka Of exactly once Is your consumption worth it , Then we have to weigh with the business .

The following is a piece of pseudo code :

if(cache.contain(msgId)){
  // cache Contained in the msgId, Has dealt with
  continue;
}else {
  lock.lock();
  cache.put(msgId,timeout);
  commitSync();
  lock.unLock();
}

// After all subsequent operations , Delete cache Medium msgId, as long as msgId There is cache in , I think I've dealt with .Note: Need to give cache Set up a message

Orderliness

First kafka The orderliness of can only guarantee the orderliness of single partition data , There is no guarantee of global order .

If it is really necessary to ensure that the data to be processed is in order , You can write the data that needs to be processed sequentially to the same partition for processing by rewriting the partition function . Partition selector needs to be implemented org.apache.kafka.clients.producer.Partitioner Interface . Then through the configuration item ProducerConfig.PARTITIONER_CLASS_CONFIG Make configuration assignments .

Back to the orderly implementation of single partition data . This is actually kafka A side effect of idempotency implementation . The new version kafka Set up enable.idempotence=true Can be adjusted dynamically after max-in-flight-request. This parameter specifies how many... The producer can send before receiving the response from the server batch news , The default value is 5.

When the retry request arrives ,batch Will be based on seq Re add to the appropriate location in the queue , And put max.in.flight.requests.per.connection Set to 1, So in front of it batch The serial number is smaller than it , Only the front ones are finished , It can send . In this way, at the expense of some performance and throughput , Ensure the order of data .

In the end, it's a long way to go , The road to big data is still a long way off . If you want to be a small partner of big data , Welcome to like, forward and pay attention , Don't get lost next time , We are moving forward together on the road of big data !

原网站

版权声明
本文为[Zhang San admin]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/209/202207281131168895.html