当前位置:网站首页>Etcd database source code analysis cluster communication initialization

Etcd database source code analysis cluster communication initialization

2022-06-26 01:10:00 Tertium ferrugosum

 Insert picture description here
Message entrance
One etcd After the node runs , Yes 3 Two channels receive external messages , With kv Data addition, deletion, modification and query request processing as an example , Introduce this 3 The working mechanism of two channels .

  1. client Of http call : Will register to http Modular keysHandler Of ServeHTTP Method treatment . Parse the good message and call EtcdServer Of Do() Method treatment .( In the figure 2)
  2. client Of grpc call : When it is started, it will turn to grpc server register quotaKVServer object ,quotaKVServer It is in combination that kvServer This data structure .grpc After the message is parsed, it will call kvServer Of Range、Put、DeleteRange、Txn、Compact Other methods .kvServer It contains a RaftKV The interface of , from EtcdServer This structure implements . So the final call is to EtcdServer Of Range、Put、DeleteRange、Txn、Compact Other methods .( In the figure 1 ETCD Database source code analysis ——etcd gRPC service API)
  3. Between nodes grpc news : Every EtcdServer contained Transport structure ,Transport There will be one peers Of map, Every peer Encapsulates the communication mode from a node to another node . Include streamReader、streamWriter etc. , Used for sending and receiving messages .streamReader There is recvc and propc queue ,streamReader After processing the received message, the message will be pushed to the queue . from peer To deal with ,peer call raftNode Of Process Method to process messages .( In the figure 3、4 This article will introduce )

Cluster communication server

func StartEtcd(inCfg *Config) (e *Etcd, err error) {
    
	//  Verification configuration inCfg, In fact, that is config Structure of the embed.Config Type of ec member 
	serving := false //  Identify whether the service is being provided 
	e = &Etcd{
    cfg: *inCfg, stopc: make(chan struct{
    })} //  establish Etcd
	// 1.  Initialize the communication required by the internal communication server of the cluster peerListener
	if e.Peers, err = configurePeerListeners(cfg); err != nil {
     return e, err }	
	// buffer channel so goroutines on closed connections won't wait forever
	e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
	// 2.  Start the cluster communication server 
	if err = e.servePeers(); err != nil {
     return e, err }	

 Insert picture description here
Initialize the communication required by the internal communication server of the cluster peerListener
configurePeerListeners Function defined in server/embed/etcd.go In file , For configuration Config in LPUrls Every url Create a peerListener, As shown in the figure above peerListener As shown in the structure , This function initializes peerListener Structure will call transport.NewListenerWithOpts Function creation net.Listener, take close Function initialized to func(context.Context) error { return peers[i].Listener.Close() }, Used to invoke net.Listener.Close function .configurePeerListeners Function is not initialized peerListener Structure of the serve function .

func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
    
	if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil {
     return nil, err }
	if err = cfg.PeerSelfCert(); err != nil {
     cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) }
	if !cfg.PeerTLSInfo.Empty() {
     cfg.logger.Info( "starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)), zap.Strings("cipher-suites", cfg.CipherSuites),) }

	peers = make([]*peerListener, len(cfg.LPUrls)) //  You need to configure Config Medium LPUrls Every url Apply for one peerListener
	defer func() {
      //  Exit the cleanup function executed by this function 
		if err == nil {
     return }
		for i := range peers {
    
			if peers[i] != nil && peers[i].close != nil {
    
				cfg.logger.Warn( "closing peer listener", zap.String("address", cfg.LPUrls[i].String()), zap.Error(err),)
				ctx, cancel := context.WithTimeout(context.Background(), time.Second)
				peers[i].close(ctx)
				cancel()
			}
		}
	}()

	for i, u := range cfg.LPUrls {
    
		if u.Scheme == "http" {
       // http agreement 
			if !cfg.PeerTLSInfo.Empty() {
     cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) }
			if cfg.PeerTLSInfo.ClientCertAuth {
     cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) }
		}
		peers[i] = &peerListener{
    close: func(context.Context) error {
     return nil }} //  For configuration Config Medium LPUrls Every url Create a peerListener
		peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(&cfg.PeerTLSInfo), transport.WithSocketOpts(&cfg.SocketOpts), transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),)
		if err != nil {
     return nil, err }
		// once serve, overwrite with 'http.Server.Shutdown'
		peers[i].close = func(context.Context) error {
     return peers[i].Listener.Close() }
	}
	return peers, nil
}

Start the cluster communication server
servePeers Function defined in server/embed/etcd.go In file , First NewPeerHandler Would call newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux. For each peerListener Create a collaboration run http server, Use pipelineHandler、streamHandler、snapHandler; to update peerListener Of close function , For each peerListener Create a collaboration run cmux.Serve(), That is, run the registration configurePeerListeners Of transport.NewListenerWithOpts establish Listener Service for .

// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
    
	ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)

	for _, p := range e.Peers {
      //  For each peerListener Create a collaboration run http server, Use pipelineHandler、streamHandler、snapHandler
		u := p.Listener.Addr().String()
		m := cmux.New(p.Listener)  //  Use configurePeerListeners Function initialization Listener establish cmx
		srv := &http.Server{
     Handler:     ph, ReadTimeout: 5 * time.Minute, ErrorLog:    defaultLog.New(io.Discard, "", 0),  } // do not log user error  establish http server
		go srv.Serve(m.Match(cmux.Any())) //  Create a collaboration run http server
		p.serve = func() error {
      // configurePeerListeners Function is not initialized peerListener Structure of the serve function , Initialize here 
			e.cfg.logger.Info( "cmux::serve", zap.String("address", u),)
			return m.Serve()
		}
		p.close = func(ctx context.Context) error {
     // gracefully shutdown http.Server. close open listeners, idle connections until context cancel or time-out  Update close function , Because the objects to be closed here are different 
			e.cfg.logger.Info("stopping serving peer traffic", zap.String("address", u),)
			srv.Shutdown(ctx)
			e.cfg.logger.Info("stopped serving peer traffic", zap.String("address", u),)
			m.Close()
			return nil
		}
	}
	
	for _, pl := range e.Peers {
     // start peer servers in a goroutine
		go func(l *peerListener) {
      //  Create a collaboration run cmux.Serve()
			u := l.Addr().String()
			e.cfg.logger.Info("serving peer traffic",zap.String("address", u),
			)
			e.errHandler(l.serve())
		}(pl)
	}
	return nil
}

Cluster communication client

Initializing EtcdServer Running in the process etcd.NewServersrv(cfg) Function will eventually initialize the transport layer , What is described here is the Transport block diagram , The following processes are mainly carried out : establish rafthttp.Transport example 、 start-up rafthttp.Transport Instance and direction rafthttp.Transport Add the corresponding to each node in the cluster to the instance Peer Instance and Remote Example operation . The detailed process will be explained in the follow-up blog .

	// TODO: move transport initialization near the definition of remote
	tr := &rafthttp.Transport{
      //  establish rafthttp.Transport example 
		Logger:      cfg.Logger, TLSInfo:     cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(),
		ID:          b.cluster.nodeID, URLs:        cfg.PeerURLs, ClusterID:   b.cluster.cl.ID(),
		Raft:        srv, Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats,
		//  Passed in here rafthttp.Raft The interface implementation is EtcdServer example .EtcdServer Yes Raft The implementation of the interface is relatively simple , It delegates the call directly to the underlying raftNode example 
		ErrorC:      srv.errorc,
	}
	if err = tr.Start(); err != nil {
     return nil, err }	 //  start-up rafthttp.Transport example 
	//  towards rafthttp.Transport Add the corresponding to each node in the cluster to the instance Peer Instance and Remote example 
	for _, m := range b.cluster.remotes {
      // add all remotes into transport
		if m.ID != b.cluster.nodeID {
     tr.AddRemote(m.ID, m.PeerURLs) }
	}
	for _, m := range b.cluster.cl.Members() {
    
		if m.ID != b.cluster.nodeID {
     tr.AddPeer(m.ID, m.PeerURLs) }
	}
	srv.r.transport = tr
原网站

版权声明
本文为[Tertium ferrugosum]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206252303400608.html