当前位置:网站首页>Etcd database source code analysis -- inter cluster network layer server interface

Etcd database source code analysis -- inter cluster network layer server interface

2022-06-26 01:10:00 Tertium ferrugosum

From the last article ETCD Database source code analysis —— Cluster communication initialization we know :

  • The cluster communication server will call configurePeerListeners Function is configuration Config in LPUrls Every url Create a peerListener, This function initializes peerListener Structure will call transport.NewListenerWithOpts Function creation net.Listener.configurePeerListeners Function is not initialized peerListener Structure of the serve function .
  • The cluster communication server will call servePeers Function to start the service .servePeers Function will create a coroutine run cmux.New(p.Listener).Serve() function .
  • The cluster communication server will call servePeers Function to start the service .servePeers Function first NewPeerHandler Would call newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux To get the corresponding Handler.servePeers Function will create a coroutine run &http.Server{Handler: ph, }.Serve(m.Match(cmux.Any()))

Let's learn about this article transport.NewListenerWithOpts Functions and pipelineHandler、streamHandler、snapHandler.

NewListenerWithOpts function

Listener The initialization code for is as follows ,transport.NewListenerWithOpts The function code is located in client/pkg/transport/listener.go In file , As transport In bag export To an external function .

		peers[i] = &peerListener{
    close: func(context.Context) error {
     return nil }}
		peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
			transport.WithTLSInfo(&cfg.PeerTLSInfo),
			transport.WithSocketOpts(&cfg.SocketOpts),
			transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
		)

client/pkg/transport/listener.go file export Two functions call to other code .NewListener Function is used without options , There is no essential difference between the two functions from the perspective of calling , In the end, it will call client/pkg/transport/listener.go In the document newListener function .

// NewListener creates a new listner.
func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) {
    
	return newListener(addr, scheme, WithTLSInfo(tlsinfo))
}
// NewListenerWithOpts creates a new listener which accepts listener options.
func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
    
	return newListener(addr, scheme, opts...)
}

newListener Function comparison is responsible , Different functions will be called according to different protocols and options to produce different Listener. in the light of unix or unixs Protocol call NewUnixListener function ( It's defined in client/pkg/transport/unix_listener.go file ); adopt listen Options ( Relevant codes are located in client/pkg/transport/listener_opts.go file ) To set the listening configuration ;rwTimeoutListener It's defined in client/pkg/transport/timeout_listener.go In file ;NewTLSListener It's defined in client/pkg/transport/listener_tls.go in . In fact, all changes can't be separated from their ancestors net.Listener GO package .

func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
    
	if scheme == "unix" || scheme == "unixs" {
     // unix sockets via unix://laddr 
		return NewUnixListener(addr)
	}

	lnOpts := newListenOpts(opts...)  // listen The option related code is located in client/pkg/transport/listener_opts.go file 
	switch {
    
	case lnOpts.IsSocketOpts():	
		config, err := newListenConfig(lnOpts.socketOpts) // new ListenConfig with socket options.
		if err != nil {
     return nil, err }
		lnOpts.ListenConfig = config
		// check for timeout
		fallthrough
	case lnOpts.IsTimeout(), lnOpts.IsSocketOpts():	
		ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr) // timeout listener with socket options.
		if err != nil {
     return nil, err }
		lnOpts.Listener = &rwTimeoutListener{
    
			Listener:     ln,
			readTimeout:  lnOpts.readTimeout,
			writeTimeout: lnOpts.writeTimeout,
		}
	case lnOpts.IsTimeout():
		ln, err := net.Listen("tcp", addr)
		if err != nil {
     return nil, err }
		lnOpts.Listener = &rwTimeoutListener{
    
			Listener:     ln,
			readTimeout:  lnOpts.readTimeout,
			writeTimeout: lnOpts.writeTimeout,
		}
	default:
		ln, err := net.Listen("tcp", addr)
		if err != nil {
     return nil, err }
		lnOpts.Listener = ln
	}

	// only skip if not passing TLSInfo
	if lnOpts.skipTLSInfoCheck && !lnOpts.IsTLS() {
     return lnOpts.Listener, nil }
	return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)
}
func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
    
	if scheme != "https" && scheme != "unixs" {
     return l, nil }
	if tlsinfo != nil && tlsinfo.SkipClientSANVerify {
     return NewTLSListener(l, tlsinfo) }
	return newTLSListener(l, tlsinfo, checkSAN)
}

pipelineHandler、streamHandler、snapHandler

servePeers Function first NewPeerHandler Would call newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux To get the corresponding Handler. First of all, let's look at etcdserver.ServerPeerV2 Interface , It contains ServerPeer, and ServerPeer Also contains the ServerV2 Interface , In the end, it can be found that ServerV2 The interface contains Server Interface . and EtcdServer(server/etcdserver/server.go) yes Server Interface implementation , therefore servePeers function code ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server) Mesomorphic parameter etcdserver.ServerPeerV2 Pass in EtcdServer There is no problem with arguments .

type ServerPeerV2 interface {
    
	ServerPeer
	HashKVHandler() http.Handler
	DowngradeEnabledHandler() http.Handler
}
type ServerPeer interface {
    
	ServerV2
	RaftHandler() http.Handler
	LeaseHandler() http.Handler
}
type ServerV2 interface {
    
	Server
	Leader() types.ID
	// Do takes a V2 request and attempts to fulfill it, returning a Response.
	Do(ctx context.Context, r pb.Request) (Response, error)
	ClientCertAuthEnabled() bool
}
type Server interface {
    
	AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
	RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
	UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
	PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
	ClusterVersion() *semver.Version
	StorageVersion() *semver.Version
	Cluster() api.Cluster
	Alarms() []*pb.AlarmMember
	LeaderChangedNotify() <-chan struct{
    }
}

NewPeerHandler Function call newPeerHandler Function registration raftHandler、leaseHandler、hashKVHandler、downgradeEnabledHandler and versionHandler、peerMembersHandler、peerMemberPromoteHandler To mux in . Here's a list of these handler Corresponding path .

handlerpath
raftHandler/raft or /raft/
leaseHandler/leases or /leases/internal
hashKVHandler/members/hashkv
downgradeEnabledHandler/downgrade/enabled
versionHandler/version
peerMembersHandler/members
peerMemberPromoteHandler/members/promote/
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
    
	return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}
func newPeerHandler(lg *zap.Logger,s etcdserver.Server,raftHandler http.Handler,leaseHandler http.Handler,hashKVHandler http.Handler,downgradeEnabledHandler http.Handler,) http.Handler {
    
	if lg == nil {
     lg = zap.NewNop() }
	peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
	peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
	mux := http.NewServeMux()
	mux.HandleFunc("/", http.NotFound)
	mux.Handle(rafthttp.RaftPrefix, raftHandler)
	mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
	mux.Handle(peerMembersPath, peerMembersHandler)
	mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
	if leaseHandler != nil {
    
		mux.Handle(leasehttp.LeasePrefix, leaseHandler)
		mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
	}
	if downgradeEnabledHandler != nil {
    
		mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
	}
	if hashKVHandler != nil {
    
		mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
	}
	mux.HandleFunc(versionPath, versionHandler(s, serveVersion))
	return mux
}

RaftHandler()
RaftHandler() Function defined in server/etcdserver/server.go In file , It will call transport Of Handler function .

func (s *EtcdServer) RaftHandler() http.Handler {
     return s.r.transport.Handler() }

transport Of Handler Function defined in server/etcdserver/api/rafthttp/transport.go In file , As shown in the following code , establish pipelineHandler、streamHandler、snapHandler And register it in mux in .

func (t *Transport) Handler() http.Handler {
    
	pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
	streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
	snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
	mux := http.NewServeMux()
	mux.Handle(RaftPrefix, pipelineHandler)
	mux.Handle(RaftStreamPrefix+"/", streamHandler)
	mux.Handle(RaftSnapshotPrefix, snapHandler)
	mux.Handle(ProbingPrefix, probing.NewHandler())
	return mux
}

Here's a list of these handler Corresponding path .

handlerpath
raftHandler/raft or /raft/
pipelineHandler/raft
streamHandler/raft/stream
snapHandler/raft/snapshot
probing.NewHandler()/raft/probing

LeaseHandler()
LeaseHandler() Function defined in server/etcdserver/server.go In file , It will call leasehttp Of NewHandler function .

func (s *EtcdServer) LeaseHandler() http.Handler {
    
	if s.lessor == nil {
     return nil }
	return leasehttp.NewHandler(s.lessor, s.ApplyWait)
}

HashKVHandler()
HashKVHandler() Function defined in server/etcdserver/corrupt.go In file , It will return to hashKVHandler Structure .

func (s *EtcdServer) HashKVHandler() http.Handler {
    
	return &hashKVHandler{
    lg: s.Logger(), server: s}
}
type hashKVHandler struct {
    
	lg     *zap.Logger
	server *EtcdServer
}

DowngradeEnabledHandler()
DowngradeEnabledHandler() Function defined in server/etcdserver/server.go In file , It will return to downgradeEnabledHandler Structure .

type downgradeEnabledHandler struct {
    
	lg      *zap.Logger
	cluster api.Cluster
	server  *EtcdServer
}
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
    
	return &downgradeEnabledHandler{
    
		lg:      s.Logger(),
		cluster: s.cluster,
		server:  s,
	}
}

versionHandler(s, serveVersion)
versionHandler() Function defined in server/etcdserver/api/etcdhttp/version.go In file , It passes... To the function in its function body ResponseWriter Objects and http.Request, Its internal function will call server.ClusterVersion() and server.StorageVersion() Get the cluster version and storage version , And then call serveVersion Function encapsulates the version information into Versions Structure , Set up http Corresponding header information , Write out after serialization .

func versionHandler(server etcdserver.Server, fn func(http.ResponseWriter, *http.Request, string, string)) http.HandlerFunc {
    
	return func(w http.ResponseWriter, r *http.Request) {
    
		clusterVersion := server.ClusterVersion()
		storageVersion := server.StorageVersion()
		clusterVersionStr, storageVersionStr := "not_decided", "unknown"
		if clusterVersion != nil {
    
			clusterVersionStr = clusterVersion.String()
		}
		if storageVersion != nil {
    
			storageVersionStr = storageVersion.String()
		}
		fn(w, r, clusterVersionStr, storageVersionStr)
	}
}
func serveVersion(w http.ResponseWriter, r *http.Request, clusterV, storageV string) {
    
	if !allowMethod(w, r, "GET") {
     return }
	vs := version.Versions{
    
		Server:  version.Version,
		Cluster: clusterV,
		Storage: storageV,
	}

	w.Header().Set("Content-Type", "application/json")
	b, err := json.Marshal(&vs)
	if err != nil {
     panic(fmt.Sprintf("cannot marshal versions to json (%v)", err)) }
	w.Write(b)
}

newPeerMembersHandler(lg, s.Cluster())
newPeerMembersHandler() Function defined in server/etcdserver/api/etcdhttp/peer.go In file , It will return to peerMembersHandler Structure .

func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler {
    
	return &peerMembersHandler{
    
		lg:      lg,
		cluster: cluster,
	}
}
type peerMembersHandler struct {
    
	lg      *zap.Logger
	cluster api.Cluster
}

newPeerMemberPromoteHandler(lg, s)
newPeerMemberPromoteHandler() Function defined in server/etcdserver/api/etcdhttp/peer.go In file , It will return to peerMemberPromoteHandler Structure .

func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler {
    
	return &peerMemberPromoteHandler{
    
		lg:      lg,
		cluster: s.Cluster(),
		server:  s,
	}
}
type peerMemberPromoteHandler struct {
    
	lg      *zap.Logger
	cluster api.Cluster
	server  etcdserver.Server
}
原网站

版权声明
本文为[Tertium ferrugosum]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/177/202206252303400527.html