当前位置:网站首页>Etcd watch principle
Etcd watch principle
2022-06-26 04:07:00 【Believe in the reason and follow the reason】
etcd watch principle

Superficial understanding , If there are mistakes , Please advise !
Service initialization
etcd It will register at startup WatchServer[1], pb.WatchServer Used for processing watch request
receive watch request
- every last watch Each stream creates a serverWatchStream Structure
- Open two goroutine, sendLoop Used for sending watch Message to flow ,recvLoop Accept the request
- select Block until flow is closed , Or timeout exit .
1. receive watch request recvLoop
recvLoop from gRPCStream read out req, Then process the types as CreateRequest, CancelRequest, ProgressRequest The situation of
- CreateRequest: Monitoring may be a range , So build key and RangeEnd. Handle StartRevision, If 0, Then use the current The system is up to date Rev+1. call mvcc Layer of watchStream.Watch, Return to one watchid, Put this id Package to watchResponse, then watchResponse writes ctrlStream
- CancelRequest: Or call mvcc Layer of watchableStore.Cancel Unsubscribe , Then clear the status information
- ProgressRequest: broadcast Broadcast the current system Rev edition
2. receive watch request sendLoop
stay watchid before , Maybe a message triggered , There is not yet id, So messages will pile up in pending in . The whole function mainly starts from mvcc.watchStream.Chan() Process the message of reading subscription in , Handle ctrlStream Control messages and processing progressTicker
- Chan(): If needPrevKV, It needs filling .watchid If it doesn't exist , Temporarily move to pending In line .Fragment Check whether subcontracting is required , Here the threshold is 1.5M, Call directly if you don't need it sws.gRPCStream.Send Just send . If there is data transmission ,sws.progress[wresp.WatchID] Set as false, No progress messages
- ctrlStream: Read control messages , Here, as long as it's access watchid, Then send the stacked pending news
- progressTicker: Periodically call RequestProgress Generate progress messages , Put the present Rev issue client
MVCC watch
This one is mainly about mvcc.watchStream, look down Watch How to achieve
It is mainly used to generate watchid, Just increase yourself . Call again watch Method to get a watcher and cancelFunc, take watcher and cancelFunc Put in watchStream
watch Method :
WatchStream Of watchableStore There are three in all group: synced, unsynced And victims, When client watch Time begins with historical records , That is to say, there are a lot of messages to be sent to client, It will be watcher The structure is thrown into unsynced In the group , Otherwise throw it to synced In the group . Why do you do this ? Because of the speed of message processing , The specific code will be explained later , Just remember that watcher Will flow through these three groups , Of course, the ideal situation is always synced In the group
MVCC Message generation
Bottom Txn use watchableStoreTxnWrite It encapsulates , Calling End Before committing the transaction , call notify Send the change message .
Traverse changes, Judgment type mvccpb.DELETE or mvccpb.PUT, And then it's packaged as envs event , call tw.s.notify Submit after sending .
newWatcherBatch For from synced Get the... To be sent in the group watcher, And then call w.send Send to channel Inside , If channel Full of , Then it means that it cannot be sent , take watcher from synced Delete... From group , To add to victim In the group , Follow up asynchronously goroutine syncVictimsLoop Handle . Let's see ,newWatcherBatch Realization
watcherSetByKey It is used to return that the ev.Kv.Key Of watcher, The internal implementation here uses adt Red and black trees , Fast range matching can be achieved . Interested can see the source code .
send Function first apply filter Filter again , Then send it to w.ch in , If full, return false. This w.ch Namely v3rpc The use of channel, Send when there is data http2 stream …
newWatchableStore Method
etcd Service startup creation watchableStore, stay newWatchableStore when , Two asynchronous... Are generated goroutine, syncWatchersLoop Is used to unsynced Of watcher become synced watcher, syncVictimsLoop Is used to victims Send out as many messages as possible .
Slow processing
1. Slow processing victim
call moveVictims Try to send a stack of messages
The code is simple , Try sending first victims The news , If it fails , Then put it in victims in . If it works , It also depends on the current system Rev Whether it is related to the watcher.minRev equal , Let's put it in synced The group is still unsynced In the group .
2. Slow processing unsynced
syncWatchersLoop Function loop call syncWatchers Handle unsynced Group data
- choose from unsynced Select... Of data to be sent watcher groups, Just see if the version is available , It's in [compactRev, curRev]
- UnsafeRange from boltdb Get all the keys/values
- Traverse watchers, Start sending qualified keys/values, If you succeed, you can start from unsynced Delete in , Add to synced in , Otherwise add to victims In line
Reference link : Dong Zerun's technical notes : Understand etcd watch Realization
边栏推荐
- Knowledge of functions
- 三元损失英文版
- 【QT】对话框dialog
- MapReduce execution principle record
- User control custom DependencyProperty
- Judge the same value of two sets 𞓜 different values
- How to use EEPROM in 51 Single Chip Microcomputer?
- Alibaba cloud function computing service one click to build Z-blog personal blog
- [Flink] Flink batch mode map side data aggregation normalizedkeysorter
- Go time package: second, millisecond, nanosecond timestamp output
猜你喜欢

【Flink】Flink 批处理模式Map端数据聚合 NormalizedKeySorter

What's wrong with connecting MySQL database with eclipse and then the words in the figure appear

Matplotlib multi line chart, dot scatter chart

How does virtual box virtual machine software accelerate the network speed in the virtual system?

1. foundation closing

Force buckle 515 Find the maximum value in each tree row
![[Nuggets' operation routine disclosure] the routine of being truly Nuggets](/img/42/b4004579fc89f7efcb6b4a4bc7e389.png)
[Nuggets' operation routine disclosure] the routine of being truly Nuggets

軟件調試測試的十大重要基本准則
![Which is the best embedded visual programming software? (introduction, evaluation and selection of visual programming platform) [scratch, mind+, mixly]](/img/9c/7af92e6ef907b443d974275614e51a.jpg)
Which is the best embedded visual programming software? (introduction, evaluation and selection of visual programming platform) [scratch, mind+, mixly]

Judge the same value of two sets 𞓜 different values
随机推荐
Force buckle 515 Find the maximum value in each tree row
Introduction Guide to the flutterplugin plug-in in the actual combat of flutter
Oracle technology sharing Oracle 19.14 upgrade 19.15
[MySQL] MySQL export database
But the Internet began to have a new evolution and began to appear in a new state
線程同步之讀寫鎖
Use soapUI to access the corresponding ESB project
Daily tests
Camera-CreateCaptureSession
1.基础关
使用SOAPUI访问对应的esb工程
Concept and implementation of QPS
而是互联网开始有了新的进化,开始以一种全新的状态出现
Introduction of mybatis invalid
How to use EEPROM in 51 Single Chip Microcomputer?
I/o virtualization technology - vfio
使用Jsoup提取接口中的图片
Sorting out the examination sites of the 13th Blue Bridge Cup single chip microcomputer objective questions
VHDL设计
Small record of neural network learning 71 - tensorflow2 deep learning with Google Lab