当前位置:网站首页>Etcd database source code analysis cluster communication initialization
Etcd database source code analysis cluster communication initialization
2022-06-26 01:10:00 【Tertium ferrugosum】

Message entrance
One etcd After the node runs , Yes 3 Two channels receive external messages , With kv Data addition, deletion, modification and query request processing as an example , Introduce this 3 The working mechanism of two channels .
- client Of http call : Will register to http Modular keysHandler Of ServeHTTP Method treatment . Parse the good message and call EtcdServer Of Do() Method treatment .( In the figure 2)
- client Of grpc call : When it is started, it will turn to grpc server register quotaKVServer object ,quotaKVServer It is in combination that kvServer This data structure .grpc After the message is parsed, it will call kvServer Of Range、Put、DeleteRange、Txn、Compact Other methods .kvServer It contains a RaftKV The interface of , from EtcdServer This structure implements . So the final call is to EtcdServer Of Range、Put、DeleteRange、Txn、Compact Other methods .( In the figure 1 ETCD Database source code analysis ——etcd gRPC service API)
- Between nodes grpc news : Every EtcdServer contained Transport structure ,Transport There will be one peers Of map, Every peer Encapsulates the communication mode from a node to another node . Include streamReader、streamWriter etc. , Used for sending and receiving messages .streamReader There is recvc and propc queue ,streamReader After processing the received message, the message will be pushed to the queue . from peer To deal with ,peer call raftNode Of Process Method to process messages .( In the figure 3、4 This article will introduce )
Cluster communication server
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// Verification configuration inCfg, In fact, that is config Structure of the embed.Config Type of ec member
serving := false // Identify whether the service is being provided
e = &Etcd{
cfg: *inCfg, stopc: make(chan struct{
})} // establish Etcd
// 1. Initialize the communication required by the internal communication server of the cluster peerListener
if e.Peers, err = configurePeerListeners(cfg); err != nil {
return e, err }
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// 2. Start the cluster communication server
if err = e.servePeers(); err != nil {
return e, err }

Initialize the communication required by the internal communication server of the cluster peerListener
configurePeerListeners Function defined in server/embed/etcd.go In file , For configuration Config in LPUrls Every url Create a peerListener, As shown in the figure above peerListener As shown in the structure , This function initializes peerListener Structure will call transport.NewListenerWithOpts Function creation net.Listener, take close Function initialized to func(context.Context) error { return peers[i].Listener.Close() }, Used to invoke net.Listener.Close function .configurePeerListeners Function is not initialized peerListener Structure of the serve function .
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
return nil, err }
if err = cfg.PeerSelfCert(); err != nil {
cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) }
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Info( "starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)), zap.Strings("cipher-suites", cfg.CipherSuites),) }
peers = make([]*peerListener, len(cfg.LPUrls)) // You need to configure Config Medium LPUrls Every url Apply for one peerListener
defer func() {
// Exit the cleanup function executed by this function
if err == nil {
return }
for i := range peers {
if peers[i] != nil && peers[i].close != nil {
cfg.logger.Warn( "closing peer listener", zap.String("address", cfg.LPUrls[i].String()), zap.Error(err),)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
peers[i].close(ctx)
cancel()
}
}
}()
for i, u := range cfg.LPUrls {
if u.Scheme == "http" {
// http agreement
if !cfg.PeerTLSInfo.Empty() {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) }
if cfg.PeerTLSInfo.ClientCertAuth {
cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) }
}
peers[i] = &peerListener{
close: func(context.Context) error {
return nil }} // For configuration Config Medium LPUrls Every url Create a peerListener
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(&cfg.PeerTLSInfo), transport.WithSocketOpts(&cfg.SocketOpts), transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),)
if err != nil {
return nil, err }
// once serve, overwrite with 'http.Server.Shutdown'
peers[i].close = func(context.Context) error {
return peers[i].Listener.Close() }
}
return peers, nil
}
Start the cluster communication server
servePeers Function defined in server/embed/etcd.go In file , First NewPeerHandler Would call newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux. For each peerListener Create a collaboration run http server, Use pipelineHandler、streamHandler、snapHandler; to update peerListener Of close function , For each peerListener Create a collaboration run cmux.Serve(), That is, run the registration configurePeerListeners Of transport.NewListenerWithOpts establish Listener Service for .
// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
for _, p := range e.Peers {
// For each peerListener Create a collaboration run http server, Use pipelineHandler、streamHandler、snapHandler
u := p.Listener.Addr().String()
m := cmux.New(p.Listener) // Use configurePeerListeners Function initialization Listener establish cmx
srv := &http.Server{
Handler: ph, ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(io.Discard, "", 0), } // do not log user error establish http server
go srv.Serve(m.Match(cmux.Any())) // Create a collaboration run http server
p.serve = func() error {
// configurePeerListeners Function is not initialized peerListener Structure of the serve function , Initialize here
e.cfg.logger.Info( "cmux::serve", zap.String("address", u),)
return m.Serve()
}
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server. close open listeners, idle connections until context cancel or time-out Update close function , Because the objects to be closed here are different
e.cfg.logger.Info("stopping serving peer traffic", zap.String("address", u),)
srv.Shutdown(ctx)
e.cfg.logger.Info("stopped serving peer traffic", zap.String("address", u),)
m.Close()
return nil
}
}
for _, pl := range e.Peers {
// start peer servers in a goroutine
go func(l *peerListener) {
// Create a collaboration run cmux.Serve()
u := l.Addr().String()
e.cfg.logger.Info("serving peer traffic",zap.String("address", u),
)
e.errHandler(l.serve())
}(pl)
}
return nil
}
Cluster communication client
Initializing EtcdServer Running in the process etcd.NewServersrv(cfg) Function will eventually initialize the transport layer , What is described here is the Transport block diagram , The following processes are mainly carried out : establish rafthttp.Transport example 、 start-up rafthttp.Transport Instance and direction rafthttp.Transport Add the corresponding to each node in the cluster to the instance Peer Instance and Remote Example operation . The detailed process will be explained in the follow-up blog .
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{
// establish rafthttp.Transport example
Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(),
ID: b.cluster.nodeID, URLs: cfg.PeerURLs, ClusterID: b.cluster.cl.ID(),
Raft: srv, Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats,
// Passed in here rafthttp.Raft The interface implementation is EtcdServer example .EtcdServer Yes Raft The implementation of the interface is relatively simple , It delegates the call directly to the underlying raftNode example
ErrorC: srv.errorc,
}
if err = tr.Start(); err != nil {
return nil, err } // start-up rafthttp.Transport example
// towards rafthttp.Transport Add the corresponding to each node in the cluster to the instance Peer Instance and Remote example
for _, m := range b.cluster.remotes {
// add all remotes into transport
if m.ID != b.cluster.nodeID {
tr.AddRemote(m.ID, m.PeerURLs) }
}
for _, m := range b.cluster.cl.Members() {
if m.ID != b.cluster.nodeID {
tr.AddPeer(m.ID, m.PeerURLs) }
}
srv.r.transport = tr
边栏推荐
- C#另外一个new类的方式Ico?以及App.config的使用
- Radio boxes are mutually exclusive and can be deselected at the same time
- Unknown device ID does not appear on the STM32 st-link utility connection! Causes and Solutions
- mysql错误代码2003的解决办法
- CXF
- 返回值为Object型方法调用equals()
- Development and monitoring of fusion experiment pulse power supply by LabVIEW
- Sword finger offer II 096 String interleaving
- 每日一问:线程和进程的区别
- Msp430f5529lp official board (red) can not debug the problem
猜你喜欢

Computer network knowledge summary (interview)

Middle order clue binary tree

Template engine - FreeMarker first experience

新库上线 | CnOpenData中国新房信息数据

马斯克 VS 乔布斯,谁是21世纪最伟大的创业家

Music spectrum display toy -- implementation and application of FFT in stm32
![Chapter V exercises (124, 678, 15, 19, 22) [microcomputer principles] [exercises]](/img/16/d67f38d32af6904a7d0be9f2e5be70.png)
Chapter V exercises (124, 678, 15, 19, 22) [microcomputer principles] [exercises]

DGUS新升级:全面支持数字视频播放功能

debezium

QT cmake pure C code calls the system console to input scanf and Chinese output garbled code
随机推荐
Middle order clue binary tree
JS reverse case: cracking login password
Music spectrum display toy -- implementation and application of FFT in stm32
同花顺软件买股票进行交易安全吗?怎么开户买股票
When you run the demo using the gin framework, there is an error "listen TCP: 8080: bind: an attempt was made to access a socket in a way forbidden"
LabVIEW开发监控聚变实验脉冲电源
[understanding of opportunity -30]: Guiguzi - internal "chapter - empathy, stand on the other side's position and narrow the psychological distance with the other side
C#使用MySql进行操作
ASP.NET cache缓存的用法
新库上线 | CnOpenData中国新房信息数据
C # operate with MySQL
Establish a j-link GDB cross debugging environment for Px4
Enlightenment Q & A
Native DOM vs. virtual DOM
Zhihuijia - full furniture function
Classic interview questions: mouse drug test and Hamming code
Web學習之TypeScript
Inheritance -- holy grail mode
Template engine - FreeMarker first experience
vite打包构建时 @charset utf-8警告问题处理;