当前位置:网站首页>Etcd database source code analysis -- inter cluster network layer server interface
Etcd database source code analysis -- inter cluster network layer server interface
2022-06-26 01:10:00 【Tertium ferrugosum】
From the last article ETCD Database source code analysis —— Cluster communication initialization we know :
- The cluster communication server will call configurePeerListeners Function is configuration Config in LPUrls Every url Create a peerListener, This function initializes peerListener Structure will call
transport.NewListenerWithOptsFunction creation net.Listener.configurePeerListeners Function is not initialized peerListener Structure of the serve function . - The cluster communication server will call servePeers Function to start the service .servePeers Function will create a coroutine run
cmux.New(p.Listener).Serve()function . - The cluster communication server will call servePeers Function to start the service .servePeers Function first NewPeerHandler Would call
newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux To get the corresponding Handler.servePeers Function will create a coroutine run&http.Server{Handler: ph, }.Serve(m.Match(cmux.Any()))
Let's learn about this article transport.NewListenerWithOpts Functions and pipelineHandler、streamHandler、snapHandler.
NewListenerWithOpts function
Listener The initialization code for is as follows ,transport.NewListenerWithOpts The function code is located in client/pkg/transport/listener.go In file , As transport In bag export To an external function .
peers[i] = &peerListener{
close: func(context.Context) error {
return nil }}
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
transport.WithTLSInfo(&cfg.PeerTLSInfo),
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
)
client/pkg/transport/listener.go file export Two functions call to other code .NewListener Function is used without options , There is no essential difference between the two functions from the perspective of calling , In the end, it will call client/pkg/transport/listener.go In the document newListener function .
// NewListener creates a new listner.
func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) {
return newListener(addr, scheme, WithTLSInfo(tlsinfo))
}
// NewListenerWithOpts creates a new listener which accepts listener options.
func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
return newListener(addr, scheme, opts...)
}
newListener Function comparison is responsible , Different functions will be called according to different protocols and options to produce different Listener. in the light of unix or unixs Protocol call NewUnixListener function ( It's defined in client/pkg/transport/unix_listener.go file ); adopt listen Options ( Relevant codes are located in client/pkg/transport/listener_opts.go file ) To set the listening configuration ;rwTimeoutListener It's defined in client/pkg/transport/timeout_listener.go In file ;NewTLSListener It's defined in client/pkg/transport/listener_tls.go in . In fact, all changes can't be separated from their ancestors net.Listener GO package .
func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
if scheme == "unix" || scheme == "unixs" {
// unix sockets via unix://laddr
return NewUnixListener(addr)
}
lnOpts := newListenOpts(opts...) // listen The option related code is located in client/pkg/transport/listener_opts.go file
switch {
case lnOpts.IsSocketOpts():
config, err := newListenConfig(lnOpts.socketOpts) // new ListenConfig with socket options.
if err != nil {
return nil, err }
lnOpts.ListenConfig = config
// check for timeout
fallthrough
case lnOpts.IsTimeout(), lnOpts.IsSocketOpts():
ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr) // timeout listener with socket options.
if err != nil {
return nil, err }
lnOpts.Listener = &rwTimeoutListener{
Listener: ln,
readTimeout: lnOpts.readTimeout,
writeTimeout: lnOpts.writeTimeout,
}
case lnOpts.IsTimeout():
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err }
lnOpts.Listener = &rwTimeoutListener{
Listener: ln,
readTimeout: lnOpts.readTimeout,
writeTimeout: lnOpts.writeTimeout,
}
default:
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err }
lnOpts.Listener = ln
}
// only skip if not passing TLSInfo
if lnOpts.skipTLSInfoCheck && !lnOpts.IsTLS() {
return lnOpts.Listener, nil }
return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)
}
func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
if scheme != "https" && scheme != "unixs" {
return l, nil }
if tlsinfo != nil && tlsinfo.SkipClientSANVerify {
return NewTLSListener(l, tlsinfo) }
return newTLSListener(l, tlsinfo, checkSAN)
}
pipelineHandler、streamHandler、snapHandler
servePeers Function first NewPeerHandler Would call newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler() Returns the s.r.transport.Handler(), Final transport.Handler Function returns the registered pipelineHandler、streamHandler、snapHandler Of mux To get the corresponding Handler. First of all, let's look at etcdserver.ServerPeerV2 Interface , It contains ServerPeer, and ServerPeer Also contains the ServerV2 Interface , In the end, it can be found that ServerV2 The interface contains Server Interface . and EtcdServer(server/etcdserver/server.go) yes Server Interface implementation , therefore servePeers function code ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server) Mesomorphic parameter etcdserver.ServerPeerV2 Pass in EtcdServer There is no problem with arguments .
type ServerPeerV2 interface {
ServerPeer
HashKVHandler() http.Handler
DowngradeEnabledHandler() http.Handler
}
type ServerPeer interface {
ServerV2
RaftHandler() http.Handler
LeaseHandler() http.Handler
}
type ServerV2 interface {
Server
Leader() types.ID
// Do takes a V2 request and attempts to fulfill it, returning a Response.
Do(ctx context.Context, r pb.Request) (Response, error)
ClientCertAuthEnabled() bool
}
type Server interface {
AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
ClusterVersion() *semver.Version
StorageVersion() *semver.Version
Cluster() api.Cluster
Alarms() []*pb.AlarmMember
LeaderChangedNotify() <-chan struct{
}
}
NewPeerHandler Function call newPeerHandler Function registration raftHandler、leaseHandler、hashKVHandler、downgradeEnabledHandler and versionHandler、peerMembersHandler、peerMemberPromoteHandler To mux in . Here's a list of these handler Corresponding path .
| handler | path |
|---|---|
| raftHandler | /raft or /raft/ |
| leaseHandler | /leases or /leases/internal |
| hashKVHandler | /members/hashkv |
| downgradeEnabledHandler | /downgrade/enabled |
| versionHandler | /version |
| peerMembersHandler | /members |
| peerMemberPromoteHandler | /members/promote/ |
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}
func newPeerHandler(lg *zap.Logger,s etcdserver.Server,raftHandler http.Handler,leaseHandler http.Handler,hashKVHandler http.Handler,downgradeEnabledHandler http.Handler,) http.Handler {
if lg == nil {
lg = zap.NewNop() }
peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(rafthttp.RaftPrefix, raftHandler)
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPath, peerMembersHandler)
mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
if downgradeEnabledHandler != nil {
mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
}
if hashKVHandler != nil {
mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
}
mux.HandleFunc(versionPath, versionHandler(s, serveVersion))
return mux
}
RaftHandler()
RaftHandler() Function defined in server/etcdserver/server.go In file , It will call transport Of Handler function .
func (s *EtcdServer) RaftHandler() http.Handler {
return s.r.transport.Handler() }
transport Of Handler Function defined in server/etcdserver/api/rafthttp/transport.go In file , As shown in the following code , establish pipelineHandler、streamHandler、snapHandler And register it in mux in .
func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}
Here's a list of these handler Corresponding path .
| handler | path |
|---|---|
| raftHandler | /raft or /raft/ |
| pipelineHandler | /raft |
| streamHandler | /raft/stream |
| snapHandler | /raft/snapshot |
| probing.NewHandler() | /raft/probing |
LeaseHandler()
LeaseHandler() Function defined in server/etcdserver/server.go In file , It will call leasehttp Of NewHandler function .
func (s *EtcdServer) LeaseHandler() http.Handler {
if s.lessor == nil {
return nil }
return leasehttp.NewHandler(s.lessor, s.ApplyWait)
}
HashKVHandler()
HashKVHandler() Function defined in server/etcdserver/corrupt.go In file , It will return to hashKVHandler Structure .
func (s *EtcdServer) HashKVHandler() http.Handler {
return &hashKVHandler{
lg: s.Logger(), server: s}
}
type hashKVHandler struct {
lg *zap.Logger
server *EtcdServer
}
DowngradeEnabledHandler()
DowngradeEnabledHandler() Function defined in server/etcdserver/server.go In file , It will return to downgradeEnabledHandler Structure .
type downgradeEnabledHandler struct {
lg *zap.Logger
cluster api.Cluster
server *EtcdServer
}
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
return &downgradeEnabledHandler{
lg: s.Logger(),
cluster: s.cluster,
server: s,
}
}
versionHandler(s, serveVersion)
versionHandler() Function defined in server/etcdserver/api/etcdhttp/version.go In file , It passes... To the function in its function body ResponseWriter Objects and http.Request, Its internal function will call server.ClusterVersion() and server.StorageVersion() Get the cluster version and storage version , And then call serveVersion Function encapsulates the version information into Versions Structure , Set up http Corresponding header information , Write out after serialization .
func versionHandler(server etcdserver.Server, fn func(http.ResponseWriter, *http.Request, string, string)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
clusterVersion := server.ClusterVersion()
storageVersion := server.StorageVersion()
clusterVersionStr, storageVersionStr := "not_decided", "unknown"
if clusterVersion != nil {
clusterVersionStr = clusterVersion.String()
}
if storageVersion != nil {
storageVersionStr = storageVersion.String()
}
fn(w, r, clusterVersionStr, storageVersionStr)
}
}
func serveVersion(w http.ResponseWriter, r *http.Request, clusterV, storageV string) {
if !allowMethod(w, r, "GET") {
return }
vs := version.Versions{
Server: version.Version,
Cluster: clusterV,
Storage: storageV,
}
w.Header().Set("Content-Type", "application/json")
b, err := json.Marshal(&vs)
if err != nil {
panic(fmt.Sprintf("cannot marshal versions to json (%v)", err)) }
w.Write(b)
}
newPeerMembersHandler(lg, s.Cluster())
newPeerMembersHandler() Function defined in server/etcdserver/api/etcdhttp/peer.go In file , It will return to peerMembersHandler Structure .
func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler {
return &peerMembersHandler{
lg: lg,
cluster: cluster,
}
}
type peerMembersHandler struct {
lg *zap.Logger
cluster api.Cluster
}
newPeerMemberPromoteHandler(lg, s)
newPeerMemberPromoteHandler() Function defined in server/etcdserver/api/etcdhttp/peer.go In file , It will return to peerMemberPromoteHandler Structure .
func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler {
return &peerMemberPromoteHandler{
lg: lg,
cluster: s.Cluster(),
server: s,
}
}
type peerMemberPromoteHandler struct {
lg *zap.Logger
cluster api.Cluster
server etcdserver.Server
}
边栏推荐
- Web information collection, naked runners on the Internet
- Unknown device ID does not appear on the STM32 st-link utility connection! Causes and Solutions
- 关于HC-12无线射频模块使用
- 剑指 Offer II 096. 字符串交织
- Case: drawing Matplotlib dynamic graph
- New library launched | cnopendata China new house information data
- Web学习之TypeScript
- The kth largest element in the array
- 信息收集的利器,Google骇客语法
- 卡通shader
猜你喜欢

Summary of push-pull output and open drain output of STM32 and failure of analog IIC driving mlx90615

Stream data

Unified gateway

New library launched | cnopendata China new house information data

Motor monitoring system based on MCGS and stm32

Redis strings command

Error 65:access violation at 0x58024400: no 'read' permission

ETCD数据库源码分析——集群通信初始化

新库上线 | CnOpenDataA股上市公司IPO申报发行文本数据

Middle order clue binary tree
随机推荐
Flex & bison start
Installation and startup of redis
Solve STM32 operation μ Solution to sudden failure of task scheduling in c/os-ii system
Redis的安装及启动
Dgus new upgrade: fully support digital video playback function
Blob
接口的幂等性——详细谈谈接口的幂等即解决方案
返回值为Object型方法调用equals()
FreeRTOS+STM32L+ESP8266+MQTT协议传输温湿度数据到腾讯云物联网平台
Return value is object type method call equals()
数据分析——切片器、数据透视表与数据透视图(职场必备)
Android cache usage tool class
QT cmake pure C code calls the system console to input scanf and Chinese output garbled code
Typescript for Web Learning
简单 deepclone
Summary of push-pull output and open drain output of STM32 and failure of analog IIC driving mlx90615
Radio boxes are mutually exclusive and can be deselected at the same time
原生DOM与虚拟DOM
Spark log analysis
Sword finger offer II 096 String interleaving