当前位置:网站首页>Etcd watch principle
Etcd watch principle
2022-06-26 04:07:00 【Believe in the reason and follow the reason】
etcd watch principle

Superficial understanding , If there are mistakes , Please advise !
Service initialization
etcd It will register at startup WatchServer[1], pb.WatchServer Used for processing watch request
receive watch request
- every last watch Each stream creates a serverWatchStream Structure
- Open two goroutine, sendLoop Used for sending watch Message to flow ,recvLoop Accept the request
- select Block until flow is closed , Or timeout exit .
1. receive watch request recvLoop
recvLoop from gRPCStream read out req, Then process the types as CreateRequest, CancelRequest, ProgressRequest The situation of
- CreateRequest: Monitoring may be a range , So build key and RangeEnd. Handle StartRevision, If 0, Then use the current The system is up to date Rev+1. call mvcc Layer of watchStream.Watch, Return to one watchid, Put this id Package to watchResponse, then watchResponse writes ctrlStream
- CancelRequest: Or call mvcc Layer of watchableStore.Cancel Unsubscribe , Then clear the status information
- ProgressRequest: broadcast Broadcast the current system Rev edition
2. receive watch request sendLoop
stay watchid before , Maybe a message triggered , There is not yet id, So messages will pile up in pending in . The whole function mainly starts from mvcc.watchStream.Chan() Process the message of reading subscription in , Handle ctrlStream Control messages and processing progressTicker
- Chan(): If needPrevKV, It needs filling .watchid If it doesn't exist , Temporarily move to pending In line .Fragment Check whether subcontracting is required , Here the threshold is 1.5M, Call directly if you don't need it sws.gRPCStream.Send Just send . If there is data transmission ,sws.progress[wresp.WatchID] Set as false, No progress messages
- ctrlStream: Read control messages , Here, as long as it's access watchid, Then send the stacked pending news
- progressTicker: Periodically call RequestProgress Generate progress messages , Put the present Rev issue client
MVCC watch
This one is mainly about mvcc.watchStream, look down Watch How to achieve
It is mainly used to generate watchid, Just increase yourself . Call again watch Method to get a watcher and cancelFunc, take watcher and cancelFunc Put in watchStream
watch Method :
WatchStream Of watchableStore There are three in all group: synced, unsynced And victims, When client watch Time begins with historical records , That is to say, there are a lot of messages to be sent to client, It will be watcher The structure is thrown into unsynced In the group , Otherwise throw it to synced In the group . Why do you do this ? Because of the speed of message processing , The specific code will be explained later , Just remember that watcher Will flow through these three groups , Of course, the ideal situation is always synced In the group
MVCC Message generation
Bottom Txn use watchableStoreTxnWrite It encapsulates , Calling End Before committing the transaction , call notify Send the change message .
Traverse changes, Judgment type mvccpb.DELETE or mvccpb.PUT, And then it's packaged as envs event , call tw.s.notify Submit after sending .
newWatcherBatch For from synced Get the... To be sent in the group watcher, And then call w.send Send to channel Inside , If channel Full of , Then it means that it cannot be sent , take watcher from synced Delete... From group , To add to victim In the group , Follow up asynchronously goroutine syncVictimsLoop Handle . Let's see ,newWatcherBatch Realization
watcherSetByKey It is used to return that the ev.Kv.Key Of watcher, The internal implementation here uses adt Red and black trees , Fast range matching can be achieved . Interested can see the source code .
send Function first apply filter Filter again , Then send it to w.ch in , If full, return false. This w.ch Namely v3rpc The use of channel, Send when there is data http2 stream …
newWatchableStore Method
etcd Service startup creation watchableStore, stay newWatchableStore when , Two asynchronous... Are generated goroutine, syncWatchersLoop Is used to unsynced Of watcher become synced watcher, syncVictimsLoop Is used to victims Send out as many messages as possible .
Slow processing
1. Slow processing victim
call moveVictims Try to send a stack of messages
The code is simple , Try sending first victims The news , If it fails , Then put it in victims in . If it works , It also depends on the current system Rev Whether it is related to the watcher.minRev equal , Let's put it in synced The group is still unsynced In the group .
2. Slow processing unsynced
syncWatchersLoop Function loop call syncWatchers Handle unsynced Group data
- choose from unsynced Select... Of data to be sent watcher groups, Just see if the version is available , It's in [compactRev, curRev]
- UnsafeRange from boltdb Get all the keys/values
- Traverse watchers, Start sending qualified keys/values, If you succeed, you can start from unsynced Delete in , Add to synced in , Otherwise add to victims In line
Reference link : Dong Zerun's technical notes : Understand etcd watch Realization
边栏推荐
- WPF 值转换
- Using jsup to extract images from interfaces
- 使用SOAPUI访问对应的esb工程
- [Flink] Flink batch mode map side data aggregation normalizedkeysorter
- Read / write lock for thread synchronization
- [learn FPGA programming from scratch -45]: vision chapter - integrated circuits help high-quality development in the digital era -2- market forecast
- 力扣 515. 在每个树行中找最大值
- 使用Jsoup提取接口中的图片
- English version of ternary loss
- 商城风格也可以很多变,DIY 了解一下
猜你喜欢

2021 year end summary
![[LOJ 6718] nine suns' weakened version (cyclic convolution, arbitrary modulus NTT)](/img/fd/0c299b7cc728f2d6274eea30937726.png)
[LOJ 6718] nine suns' weakened version (cyclic convolution, arbitrary modulus NTT)

Parse JSON interface and insert it into the database in batch

Custom parameter QR code picture combined with background picture to generate new picture node environment

Alibaba cloud function computing service one click to build Z-blog personal blog

High performance computing center roce overview

Sorting out the examination sites of the 13th Blue Bridge Cup single chip microcomputer objective questions

DETR3D 多2d图片3D检测框架

判断两个集合的相同值 ||不同值

Nailing open platform - applet development practice (nailing applet client)
随机推荐
Binary search method
What should I do if the 51 SCM board cannot find the device in keil
线程同步之互斥量(互斥锁)
(15) Blender source code analysis flash window display menu function
VHDL设计
Using jsup to extract images from interfaces
Optimization - multi objective planning
MySQL est livré avec l'outil de test de performance MySQL lap pour effectuer des tests de résistance
Spark - understand parquet
Nailing open platform - applet development practice (nailing applet client)
The stc-isp burning program for 51 single chip microcomputer always shows that "the target single chip microcomputer is being detected..." the cold start board does not respond
Conditional variables for thread synchronization
[collection of good books] from technology to products
第 4 篇:绘制四边形
What if the serial port fails to open when the SCM uses stc-isp to download software?
Oracle technology sharing Oracle 19.14 upgrade 19.15
Nailing open platform - applet development practice (nailing applet server side)
Force buckle 515 Find the maximum value in each tree row
Knowledge of functions
MySQL's built-in performance testing tool, mysqlslap, performs stress testing