当前位置:网站首页>Etcd database source code analysis -- inter cluster network layer server interface
Etcd database source code analysis -- inter cluster network layer server interface
2022-06-26 01:10:00 【Tertium ferrugosum】
From the last article ETCD Database source code analysis —— Cluster communication initialization we know :
- The cluster communication server will call configurePeerListeners Function is configuration Config in LPUrls Every url Create a peerListener, This function initializes peerListener Structure will call
transport.NewListenerWithOptsFunction creation net.Listener.configurePeerListeners Function is not initialized peerListener Structure of the serve function . - The cluster communication server will call servePeers Function to start the service .servePeers Function will create a coroutine run
cmux.New(p.Listener).Serve()function . - The cluster communication server will call servePeers Function to start the service .servePeers Function first NewPeerHandler Would call
newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux To get the corresponding Handler.servePeers Function will create a coroutine run&http.Server{Handler: ph, }.Serve(m.Match(cmux.Any()))
Let's learn about this article transport.NewListenerWithOpts Functions and pipelineHandler、streamHandler、snapHandler.
NewListenerWithOpts function
Listener The initialization code for is as follows ,transport.NewListenerWithOpts The function code is located in client/pkg/transport/listener.go In file , As transport In bag export To an external function .
peers[i] = &peerListener{
close: func(context.Context) error {
return nil }}
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
transport.WithTLSInfo(&cfg.PeerTLSInfo),
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
)
client/pkg/transport/listener.go file export Two functions call to other code .NewListener Function is used without options , There is no essential difference between the two functions from the perspective of calling , In the end, it will call client/pkg/transport/listener.go In the document newListener function .
// NewListener creates a new listner.
func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) {
return newListener(addr, scheme, WithTLSInfo(tlsinfo))
}
// NewListenerWithOpts creates a new listener which accepts listener options.
func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
return newListener(addr, scheme, opts...)
}
newListener Function comparison is responsible , Different functions will be called according to different protocols and options to produce different Listener. in the light of unix or unixs Protocol call NewUnixListener function ( It's defined in client/pkg/transport/unix_listener.go file ); adopt listen Options ( Relevant codes are located in client/pkg/transport/listener_opts.go file ) To set the listening configuration ;rwTimeoutListener It's defined in client/pkg/transport/timeout_listener.go In file ;NewTLSListener It's defined in client/pkg/transport/listener_tls.go in . In fact, all changes can't be separated from their ancestors net.Listener GO package .
func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
if scheme == "unix" || scheme == "unixs" {
// unix sockets via unix://laddr
return NewUnixListener(addr)
}
lnOpts := newListenOpts(opts...) // listen The option related code is located in client/pkg/transport/listener_opts.go file
switch {
case lnOpts.IsSocketOpts():
config, err := newListenConfig(lnOpts.socketOpts) // new ListenConfig with socket options.
if err != nil {
return nil, err }
lnOpts.ListenConfig = config
// check for timeout
fallthrough
case lnOpts.IsTimeout(), lnOpts.IsSocketOpts():
ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr) // timeout listener with socket options.
if err != nil {
return nil, err }
lnOpts.Listener = &rwTimeoutListener{
Listener: ln,
readTimeout: lnOpts.readTimeout,
writeTimeout: lnOpts.writeTimeout,
}
case lnOpts.IsTimeout():
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err }
lnOpts.Listener = &rwTimeoutListener{
Listener: ln,
readTimeout: lnOpts.readTimeout,
writeTimeout: lnOpts.writeTimeout,
}
default:
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err }
lnOpts.Listener = ln
}
// only skip if not passing TLSInfo
if lnOpts.skipTLSInfoCheck && !lnOpts.IsTLS() {
return lnOpts.Listener, nil }
return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)
}
func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
if scheme != "https" && scheme != "unixs" {
return l, nil }
if tlsinfo != nil && tlsinfo.SkipClientSANVerify {
return NewTLSListener(l, tlsinfo) }
return newTLSListener(l, tlsinfo, checkSAN)
}
pipelineHandler、streamHandler、snapHandler
servePeers Function first NewPeerHandler Would call newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux To get the corresponding Handler. First of all, let's look at etcdserver.ServerPeerV2 Interface , It contains ServerPeer, and ServerPeer Also contains the ServerV2 Interface , In the end, it can be found that ServerV2 The interface contains Server Interface . and EtcdServer(server/etcdserver/server.go) yes Server Interface implementation , therefore servePeers function code ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server) Mesomorphic parameter etcdserver.ServerPeerV2 Pass in EtcdServer There is no problem with arguments .
type ServerPeerV2 interface {
ServerPeer
HashKVHandler() http.Handler
DowngradeEnabledHandler() http.Handler
}
type ServerPeer interface {
ServerV2
RaftHandler() http.Handler
LeaseHandler() http.Handler
}
type ServerV2 interface {
Server
Leader() types.ID
// Do takes a V2 request and attempts to fulfill it, returning a Response.
Do(ctx context.Context, r pb.Request) (Response, error)
ClientCertAuthEnabled() bool
}
type Server interface {
AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
ClusterVersion() *semver.Version
StorageVersion() *semver.Version
Cluster() api.Cluster
Alarms() []*pb.AlarmMember
LeaderChangedNotify() <-chan struct{
}
}
NewPeerHandler Function call newPeerHandler Function registration raftHandler、leaseHandler、hashKVHandler、downgradeEnabledHandler and versionHandler、peerMembersHandler、peerMemberPromoteHandler To mux in . Here's a list of these handler Corresponding path .
| handler | path |
|---|---|
| raftHandler | /raft or /raft/ |
| leaseHandler | /leases or /leases/internal |
| hashKVHandler | /members/hashkv |
| downgradeEnabledHandler | /downgrade/enabled |
| versionHandler | /version |
| peerMembersHandler | /members |
| peerMemberPromoteHandler | /members/promote/ |
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}
func newPeerHandler(lg *zap.Logger,s etcdserver.Server,raftHandler http.Handler,leaseHandler http.Handler,hashKVHandler http.Handler,downgradeEnabledHandler http.Handler,) http.Handler {
if lg == nil {
lg = zap.NewNop() }
peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(rafthttp.RaftPrefix, raftHandler)
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPath, peerMembersHandler)
mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
if downgradeEnabledHandler != nil {
mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
}
if hashKVHandler != nil {
mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
}
mux.HandleFunc(versionPath, versionHandler(s, serveVersion))
return mux
}
RaftHandler()
RaftHandler() Function defined in server/etcdserver/server.go In file , It will call transport Of Handler function .
func (s *EtcdServer) RaftHandler() http.Handler {
return s.r.transport.Handler() }
transport Of Handler Function defined in server/etcdserver/api/rafthttp/transport.go In file , As shown in the following code , establish pipelineHandler、streamHandler、snapHandler And register it in mux in .
func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}
Here's a list of these handler Corresponding path .
| handler | path |
|---|---|
| raftHandler | /raft or /raft/ |
| pipelineHandler | /raft |
| streamHandler | /raft/stream |
| snapHandler | /raft/snapshot |
| probing.NewHandler() | /raft/probing |
LeaseHandler()
LeaseHandler() Function defined in server/etcdserver/server.go In file , It will call leasehttp Of NewHandler function .
func (s *EtcdServer) LeaseHandler() http.Handler {
if s.lessor == nil {
return nil }
return leasehttp.NewHandler(s.lessor, s.ApplyWait)
}
HashKVHandler()
HashKVHandler() Function defined in server/etcdserver/corrupt.go In file , It will return to hashKVHandler Structure .
func (s *EtcdServer) HashKVHandler() http.Handler {
return &hashKVHandler{
lg: s.Logger(), server: s}
}
type hashKVHandler struct {
lg *zap.Logger
server *EtcdServer
}
DowngradeEnabledHandler()
DowngradeEnabledHandler() Function defined in server/etcdserver/server.go In file , It will return to downgradeEnabledHandler Structure .
type downgradeEnabledHandler struct {
lg *zap.Logger
cluster api.Cluster
server *EtcdServer
}
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
return &downgradeEnabledHandler{
lg: s.Logger(),
cluster: s.cluster,
server: s,
}
}
versionHandler(s, serveVersion)
versionHandler() Function defined in server/etcdserver/api/etcdhttp/version.go In file , It passes... To the function in its function body ResponseWriter Objects and http.Request, Its internal function will call server.ClusterVersion() and server.StorageVersion() Get the cluster version and storage version , And then call serveVersion Function encapsulates the version information into Versions Structure , Set up http Corresponding header information , Write out after serialization .
func versionHandler(server etcdserver.Server, fn func(http.ResponseWriter, *http.Request, string, string)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
clusterVersion := server.ClusterVersion()
storageVersion := server.StorageVersion()
clusterVersionStr, storageVersionStr := "not_decided", "unknown"
if clusterVersion != nil {
clusterVersionStr = clusterVersion.String()
}
if storageVersion != nil {
storageVersionStr = storageVersion.String()
}
fn(w, r, clusterVersionStr, storageVersionStr)
}
}
func serveVersion(w http.ResponseWriter, r *http.Request, clusterV, storageV string) {
if !allowMethod(w, r, "GET") {
return }
vs := version.Versions{
Server: version.Version,
Cluster: clusterV,
Storage: storageV,
}
w.Header().Set("Content-Type", "application/json")
b, err := json.Marshal(&vs)
if err != nil {
panic(fmt.Sprintf("cannot marshal versions to json (%v)", err)) }
w.Write(b)
}
newPeerMembersHandler(lg, s.Cluster())
newPeerMembersHandler() Function defined in server/etcdserver/api/etcdhttp/peer.go In file , It will return to peerMembersHandler Structure .
func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler {
return &peerMembersHandler{
lg: lg,
cluster: cluster,
}
}
type peerMembersHandler struct {
lg *zap.Logger
cluster api.Cluster
}
newPeerMemberPromoteHandler(lg, s)
newPeerMemberPromoteHandler() Function defined in server/etcdserver/api/etcdhttp/peer.go In file , It will return to peerMemberPromoteHandler Structure .
func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler {
return &peerMemberPromoteHandler{
lg: lg,
cluster: s.Cluster(),
server: s,
}
}
type peerMemberPromoteHandler struct {
lg *zap.Logger
cluster api.Cluster
server etcdserver.Server
}
边栏推荐
- Music spectrum display toy -- implementation and application of FFT in stm32
- .net使用Access 2010数据库
- Xinku online | cnopendata text data of IPO declaration and issuance of A-share listed companies
- FPGA notes -- implementation of FPGA floating point operation
- [learn FPGA programming from scratch -44]: vision chapter - integrated circuit helps high-quality development in the digital era -1- main forms of integrated circuit chips
- Redis strings command
- Chapter V exercises (124, 678, 15, 19, 22) [microcomputer principles] [exercises]
- The kth largest element in the array
- ASP.NET cache缓存的用法
- Web学习之TypeScript
猜你喜欢

Preorder and middle order traversal of forest

从查询数据库性能优化谈到redis缓存-谈一谈缓存的穿透、雪崩、击穿

ADC acquisition noise and comparison between RMS filter and Kalman filter

Optimized three-dimensional space positioning method and its fast implementation in C language

Redis之Strings命令

Endnote IEEE Transactions on industrial electronics/tie/tpel reference format template

About the use of hc-12 radio frequency module

使用Gin框架运行Demo时报错“ listen tcp :8080: bind: An attempt was made to access a socket in a way forbidden”

Implementation notes of least square fitting conic in stm32

C # operate with MySQL
随机推荐
C#另外一个new类的方式Ico?以及App.config的使用
关于EF翻页查询数据库
Final review [machine learning]
Online gadget sharing (updated from time to time, current quantity: 2)
mysql
Native DOM vs. virtual DOM
Xinku online | cnopendata text data of IPO declaration and issuance of A-share listed companies
Classic interview questions: mouse drug test and Hamming code
剑指 Offer II 096. 字符串交织
Sword finger offer II 096 String interleaving
Modelsim simulation FFT core cannot be simulated solution (qsys)
Optimized three-dimensional space positioning method and its fast implementation in C language
Establish a j-link GDB cross debugging environment for Px4
Comment promouvoir efficacement les produits
About the use of hc-12 radio frequency module
[机缘参悟-30]:鬼谷子-内揵篇-同理心,站在对方的立场,拉近与对方的心理距离
WIN10系统C盘清理策略
Spark log analysis
[understanding of opportunity -30]: Guiguzi - internal "chapter - empathy, stand on the other side's position and narrow the psychological distance with the other side
Musk vs. jobs, who is the greatest entrepreneur in the 21st century